risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::ops::Bound;
17
18use bytes::Bytes;
19use futures::future::Either;
20use futures::stream::{self, select_with_strategy};
21use futures_async_stream::try_stream;
22use itertools::Itertools;
23use risingwave_common::array::Op;
24use risingwave_common::bitmap::Bitmap;
25use risingwave_common::catalog::{
26    ColumnDesc, ConflictBehavior, TableId, checked_conflict_behaviors,
27};
28use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
29use risingwave_common::row::{OwnedRow, RowExt};
30use risingwave_common::types::DataType;
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::BasicSerde;
33use risingwave_hummock_sdk::HummockReadEpoch;
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_pb::id::{SourceId, SubscriberId};
37use risingwave_pb::stream_plan::SubscriptionUpstreamInfo;
38use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
39use risingwave_storage::store::{PrefetchOptions, TryWaitEpochOptions};
40use risingwave_storage::table::KeyedRow;
41
42use crate::common::change_buffer::output_kind as cb_kind;
43use crate::common::metrics::MetricsInfo;
44use crate::common::table::state_table::{
45    StateTableBuilder, StateTableInner, StateTableOpConsistencyLevel,
46};
47use crate::executor::error::ErrorKind;
48use crate::executor::monitor::MaterializeMetrics;
49use crate::executor::mview::RefreshProgressTable;
50use crate::executor::mview::cache::MaterializeCache;
51use crate::executor::prelude::*;
52use crate::executor::{BarrierInner, BarrierMutationType, EpochPair};
53use crate::task::LocalBarrierManager;
54
55#[derive(Debug, Clone)]
56pub enum MaterializeStreamState<M> {
57    NormalIngestion,
58    MergingData,
59    CleanUp,
60    CommitAndYieldBarrier {
61        barrier: BarrierInner<M>,
62        expect_next_state: Box<MaterializeStreamState<M>>,
63    },
64    RefreshEnd {
65        on_complete_epoch: EpochPair,
66    },
67}
68
69/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
70pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
71    input: Executor,
72
73    schema: Schema,
74
75    state_table: StateTableInner<S, SD>,
76
77    /// Stream key indices of the *output* of this materialize node.
78    ///
79    /// This can be different from the table PK. Typically, there are 3 cases:
80    ///
81    /// - Normal `TABLE`: `stream_key == pk`, so no special handling is needed.
82    /// - TTL-ed `TABLE` (`WITH TTL`): `stream_key` is a superset of `pk` by appending the TTL
83    ///   watermark column. In this case, updates to the same pk may have different stream keys, so
84    ///   we must rewrite such `Update` into `Delete + Insert` before yielding.
85    /// - `MV` or `INDEX`: `pk` can be a superset of `stream_key` to also include the order key or
86    ///   distribution key specified by the user. No special handling is needed either.
87    stream_key_indices: Vec<usize>,
88
89    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
90    arrange_key_indices: Vec<usize>,
91
92    actor_context: ActorContextRef,
93
94    /// The cache for conflict handling. `None` if conflict behavior is `NoCheck`.
95    materialize_cache: Option<MaterializeCache>,
96
97    conflict_behavior: ConflictBehavior,
98
99    /// Whether the table can clean itself by TTL watermark, i.e., is defined with `WATERMARK ... WITH TTL`.
100    cleaned_by_ttl_watermark: bool,
101
102    version_column_indices: Vec<u32>,
103
104    may_have_downstream: bool,
105
106    subscriber_ids: HashSet<SubscriberId>,
107
108    metrics: MaterializeMetrics,
109
110    /// No data will be written to hummock table. This Materialize is just a dummy node.
111    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
112    is_dummy_table: bool,
113
114    /// Optional refresh arguments and state for refreshable materialized views
115    refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
116
117    /// Local barrier manager for reporting barrier events
118    local_barrier_manager: LocalBarrierManager,
119}
120
121/// Arguments and state for refreshable materialized views
122pub struct RefreshableMaterializeArgs<S: StateStore, SD: ValueRowSerde> {
123    /// Table catalog for main table
124    pub table_catalog: Table,
125
126    /// Table catalog for staging table
127    pub staging_table_catalog: Table,
128
129    /// Flag indicating if this table is currently being refreshed
130    pub is_refreshing: bool,
131
132    /// During data refresh (between `RefreshStart` and `LoadFinish`),
133    /// data will be written to both the main table and the staging table.
134    ///
135    /// The staging table is PK-only.
136    ///
137    /// After `LoadFinish`, we will do a `DELETE FROM main_table WHERE pk NOT IN (SELECT pk FROM staging_table)`, and then purge the staging table.
138    pub staging_table: StateTableInner<S, SD>,
139
140    /// Progress table for tracking refresh state per `VNode` for fault tolerance
141    pub progress_table: RefreshProgressTable<S>,
142
143    /// Table ID for this refreshable materialized view
144    pub table_id: TableId,
145}
146
147impl<S: StateStore, SD: ValueRowSerde> RefreshableMaterializeArgs<S, SD> {
148    /// Create new `RefreshableMaterializeArgs`
149    pub async fn new(
150        store: S,
151        table_catalog: &Table,
152        staging_table_catalog: &Table,
153        progress_state_table: &Table,
154        vnodes: Option<Arc<Bitmap>>,
155    ) -> Self {
156        let table_id = table_catalog.id;
157
158        // staging table is pk-only, and we don't need to check value consistency
159        let staging_table = StateTableInner::from_table_catalog_inconsistent_op(
160            staging_table_catalog,
161            store.clone(),
162            vnodes.clone(),
163        )
164        .await;
165
166        let progress_state_table = StateTableInner::from_table_catalog_inconsistent_op(
167            progress_state_table,
168            store,
169            vnodes,
170        )
171        .await;
172
173        // Get primary key length from main table catalog
174        let pk_len = table_catalog.pk.len();
175        let progress_table = RefreshProgressTable::new(progress_state_table, pk_len);
176
177        debug_assert_eq!(staging_table.vnodes(), progress_table.vnodes());
178
179        Self {
180            table_catalog: table_catalog.clone(),
181            staging_table_catalog: staging_table_catalog.clone(),
182            is_refreshing: false,
183            staging_table,
184            progress_table,
185            table_id,
186        }
187    }
188}
189
190fn get_op_consistency_level(
191    cleaned_by_ttl_watermark: bool,
192    conflict_behavior: ConflictBehavior,
193    may_have_downstream: bool,
194    subscriber_ids: &HashSet<SubscriberId>,
195) -> StateTableOpConsistencyLevel {
196    if cleaned_by_ttl_watermark {
197        // For tables with watermark TTL. Due to async state cleaning, it's uncertain whether an expired
198        // key has been cleaned up by the storage or not, thus we are not sure if to write `Update` or `Insert`
199        // if the key appears again.
200        StateTableOpConsistencyLevel::Inconsistent
201    } else if !subscriber_ids.is_empty() {
202        StateTableOpConsistencyLevel::LogStoreEnabled
203    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
204        // Table with overwrite conflict behavior could disable conflict check
205        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
206        StateTableOpConsistencyLevel::Inconsistent
207    } else {
208        StateTableOpConsistencyLevel::ConsistentOldValue
209    }
210}
211
212impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
213    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
214    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
215    /// should be `None`.
216    #[allow(clippy::too_many_arguments)]
217    pub async fn new(
218        input: Executor,
219        schema: Schema,
220        store: S,
221        arrange_key: Vec<ColumnOrder>,
222        actor_context: ActorContextRef,
223        vnodes: Option<Arc<Bitmap>>,
224        table_catalog: &Table,
225        watermark_epoch: AtomicU64Ref,
226        conflict_behavior: ConflictBehavior,
227        version_column_indices: Vec<u32>,
228        metrics: Arc<StreamingMetrics>,
229        refresh_args: Option<RefreshableMaterializeArgs<S, SD>>,
230        cleaned_by_ttl_watermark: bool,
231        local_barrier_manager: LocalBarrierManager,
232    ) -> Self {
233        let table_columns: Vec<ColumnDesc> = table_catalog
234            .columns
235            .iter()
236            .map(|col| col.column_desc.as_ref().unwrap().into())
237            .collect();
238
239        // Extract TOAST-able column indices from table columns.
240        // Only for PostgreSQL CDC tables.
241        let toastable_column_indices = if table_catalog.cdc_table_type()
242            == risingwave_pb::catalog::table::CdcTableType::Postgres
243        {
244            let toastable_indices: Vec<usize> = table_columns
245                .iter()
246                .enumerate()
247                .filter_map(|(index, column)| match &column.data_type {
248                    // Currently supports TOAST updates for:
249                    // - jsonb (DataType::Jsonb)
250                    // - varchar (DataType::Varchar)
251                    // - bytea (DataType::Bytea)
252                    // - One-dimensional arrays of the above types (DataType::List)
253                    //   Note: Some array types may not be fully supported yet, see issue  https://github.com/risingwavelabs/risingwave/issues/22916 for details.
254
255                    // For details on how TOAST values are handled, see comments in `is_debezium_unavailable_value`.
256                    DataType::Varchar | DataType::List(_) | DataType::Bytea | DataType::Jsonb => {
257                        Some(index)
258                    }
259                    _ => None,
260                })
261                .collect();
262
263            if toastable_indices.is_empty() {
264                None
265            } else {
266                Some(toastable_indices)
267            }
268        } else {
269            None
270        };
271
272        let row_serde: BasicSerde = BasicSerde::new(
273            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
274            Arc::from(table_columns.into_boxed_slice()),
275        );
276
277        let stream_key_indices: Vec<usize> = table_catalog
278            .stream_key
279            .iter()
280            .map(|idx| *idx as usize)
281            .collect();
282        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
283        let may_have_downstream = actor_context.initial_dispatch_num != 0;
284        let subscriber_ids = actor_context.initial_subscriber_ids.clone();
285        let op_consistency_level = get_op_consistency_level(
286            cleaned_by_ttl_watermark,
287            conflict_behavior,
288            may_have_downstream,
289            &subscriber_ids,
290        );
291        let state_table_metrics = metrics.new_state_table_metrics(
292            table_catalog.id,
293            actor_context.id,
294            actor_context.fragment_id,
295        );
296        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
297        let state_table = StateTableBuilder::new(table_catalog, store, vnodes)
298            .with_op_consistency_level(op_consistency_level)
299            .enable_preload_all_rows_by_config(&actor_context.config)
300            .enable_vnode_key_stats(
301                actor_context
302                    .config
303                    .developer
304                    .enable_vnode_key_stats_for_materialize,
305                &actor_context.config,
306            )
307            .with_metrics(state_table_metrics)
308            .build()
309            .await;
310
311        let mv_metrics = metrics.new_materialize_metrics(
312            table_catalog.id,
313            actor_context.id,
314            actor_context.fragment_id,
315        );
316        let cache_metrics = metrics.new_materialize_cache_metrics(
317            table_catalog.id,
318            actor_context.id,
319            actor_context.fragment_id,
320        );
321
322        let metrics_info =
323            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
324
325        let is_dummy_table =
326            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
327
328        Self {
329            input,
330            schema,
331            state_table,
332            stream_key_indices,
333            arrange_key_indices,
334            actor_context,
335            materialize_cache: MaterializeCache::new(
336                watermark_epoch,
337                metrics_info,
338                row_serde,
339                version_column_indices.clone(),
340                conflict_behavior,
341                toastable_column_indices,
342                cache_metrics,
343            ),
344            conflict_behavior,
345            cleaned_by_ttl_watermark,
346            version_column_indices,
347            is_dummy_table,
348            may_have_downstream,
349            subscriber_ids,
350            metrics: mv_metrics,
351            refresh_args,
352            local_barrier_manager,
353        }
354    }
355
356    #[try_stream(ok = Message, error = StreamExecutorError)]
357    async fn execute_inner(mut self) {
358        let mv_table_id = self.state_table.table_id();
359        let data_types = self.schema.data_types();
360        let mut input = self.input.execute();
361
362        let barrier = expect_first_barrier(&mut input).await?;
363        let first_epoch = barrier.epoch;
364        let _barrier_epoch = barrier.epoch; // Save epoch for later use (unused in normal execution)
365        // The first barrier message should be propagated.
366        yield Message::Barrier(barrier);
367        self.state_table.init_epoch(first_epoch).await?;
368
369        // default to normal ingestion
370        let mut inner_state =
371            Box::new(MaterializeStreamState::<BarrierMutationType>::NormalIngestion);
372        // Initialize staging table for refreshable materialized views
373        if let Some(ref mut refresh_args) = self.refresh_args {
374            refresh_args.staging_table.init_epoch(first_epoch).await?;
375
376            // Initialize progress table and load existing progress for recovery
377            refresh_args.progress_table.recover(first_epoch).await?;
378
379            // Check if refresh is already in progress (recovery scenario)
380            let progress_stats = refresh_args.progress_table.get_progress_stats();
381            if progress_stats.total_vnodes > 0 && !progress_stats.is_complete() {
382                refresh_args.is_refreshing = true;
383                tracing::info!(
384                    total_vnodes = progress_stats.total_vnodes,
385                    completed_vnodes = progress_stats.completed_vnodes,
386                    "Recovered refresh in progress, resuming refresh operation"
387                );
388
389                // Since stage info is no longer stored in progress table,
390                // we need to determine recovery state differently.
391                // For now, assume all incomplete VNodes need to continue merging
392                let incomplete_vnodes: Vec<_> = refresh_args
393                    .progress_table
394                    .get_all_progress()
395                    .iter()
396                    .filter(|(_, entry)| !entry.is_completed)
397                    .map(|(&vnode, _)| vnode)
398                    .collect();
399
400                if !incomplete_vnodes.is_empty() {
401                    // Some VNodes are incomplete, need to resume refresh operation
402                    tracing::info!(
403                        incomplete_vnodes = incomplete_vnodes.len(),
404                        "Recovery detected incomplete VNodes, resuming refresh operation"
405                    );
406                    // Since stage tracking is now in memory, we'll determine the appropriate
407                    // stage based on the executor's internal state machine
408                } else {
409                    // This should not happen if is_complete() returned false, but handle it gracefully
410                    tracing::warn!("Unexpected recovery state: no incomplete VNodes found");
411                }
412            }
413        }
414
415        // Determine initial execution stage (for recovery scenarios)
416        if let Some(ref refresh_args) = self.refresh_args
417            && refresh_args.is_refreshing
418        {
419            // Recovery logic: Check if there are incomplete vnodes from previous run
420            let incomplete_vnodes: Vec<_> = refresh_args
421                .progress_table
422                .get_all_progress()
423                .iter()
424                .filter(|(_, entry)| !entry.is_completed)
425                .map(|(&vnode, _)| vnode)
426                .collect();
427            if !incomplete_vnodes.is_empty() {
428                // Resume from merge stage since some VNodes were left incomplete
429                inner_state = Box::new(MaterializeStreamState::<_>::MergingData);
430                tracing::info!(
431                    incomplete_vnodes = incomplete_vnodes.len(),
432                    "Recovery: Resuming refresh from merge stage due to incomplete VNodes"
433                );
434            }
435        }
436
437        // Main execution loop: cycles through Stage 1 -> Stage 2 -> Stage 3 -> Stage 1...
438        'main_loop: loop {
439            match *inner_state {
440                MaterializeStreamState::NormalIngestion => {
441                    #[for_await]
442                    '_normal_ingest: for msg in input.by_ref() {
443                        let msg = msg?;
444                        if let Some(cache) = &mut self.materialize_cache {
445                            cache.evict();
446                        }
447
448                        match msg {
449                            Message::Watermark(w) => {
450                                if self.cleaned_by_ttl_watermark
451                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
452                                {
453                                    self.state_table.update_watermark(w.val.clone());
454                                }
455                                yield Message::Watermark(w);
456                            }
457                            Message::Chunk(chunk) if self.is_dummy_table => {
458                                self.metrics
459                                    .materialize_input_row_count
460                                    .inc_by(chunk.cardinality() as u64);
461                                yield Message::Chunk(chunk);
462                            }
463                            Message::Chunk(chunk) => {
464                                self.metrics
465                                    .materialize_input_row_count
466                                    .inc_by(chunk.cardinality() as u64);
467
468                                // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
469                                // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
470                                // and the conflict behavior is overwrite. We can rely on the state table to overwrite the conflicting rows in the storage,
471                                // while outputting inconsistent changes to downstream which no one will subscribe to.
472                                // For tables with watermark TTL (indicated by `cleaned_by_ttl_watermark`), conflict check must be enabled.
473                                // TODO(ttl): differentiate between table consistency and downstream consistency.
474                                let optimized_conflict_behavior = if let ConflictBehavior::Overwrite =
475                                    self.conflict_behavior
476                                    && !self.state_table.is_consistent_op()
477                                    && !self.cleaned_by_ttl_watermark
478                                    && self.version_column_indices.is_empty()
479                                {
480                                    ConflictBehavior::NoCheck
481                                } else {
482                                    self.conflict_behavior
483                                };
484
485                                match optimized_conflict_behavior {
486                                    checked_conflict_behaviors!() => {
487                                        if chunk.cardinality() == 0 {
488                                            // empty chunk
489                                            continue;
490                                        }
491
492                                        // For refreshable materialized views, write to staging table during refresh
493                                        // Do not use generate_output here.
494                                        if let Some(ref mut refresh_args) = self.refresh_args
495                                            && refresh_args.is_refreshing
496                                        {
497                                            let key_chunk = chunk
498                                                .clone()
499                                                .project(self.state_table.pk_indices());
500                                            tracing::trace!(
501                                                staging_chunk = %key_chunk.to_pretty(),
502                                                input_chunk = %chunk.to_pretty(),
503                                                "writing to staging table"
504                                            );
505                                            if cfg!(debug_assertions) {
506                                                // refreshable source should be append-only
507                                                assert!(
508                                                    key_chunk
509                                                        .ops()
510                                                        .iter()
511                                                        .all(|op| op == &Op::Insert)
512                                                );
513                                            }
514                                            refresh_args
515                                                .staging_table
516                                                .write_chunk(key_chunk.clone());
517                                            refresh_args.staging_table.try_flush().await?;
518                                        }
519
520                                        let cache = self.materialize_cache.as_mut().unwrap();
521                                        let change_buffer =
522                                            cache.handle_new(chunk, &self.state_table).await?;
523
524                                        let output_chunk = if self.stream_key_indices
525                                            == self.state_table.pk_indices()
526                                        {
527                                            change_buffer.into_chunk::<{ cb_kind::RETRACT }>(
528                                                data_types.clone(),
529                                            )
530                                        } else {
531                                            // We only hit this branch for TTL-ed tables for now.
532                                            // Assert stream key is a superset of pk.
533                                            debug_assert!(
534                                                self.state_table
535                                                    .pk_indices()
536                                                    .iter()
537                                                    .all(|&i| self.stream_key_indices.contains(&i))
538                                            );
539                                            change_buffer.into_chunk_with_key(
540                                                data_types.clone(),
541                                                &self.stream_key_indices,
542                                            )
543                                        };
544
545                                        match output_chunk {
546                                            Some(output_chunk) => {
547                                                self.state_table.write_chunk(output_chunk.clone());
548                                                self.state_table.try_flush().await?;
549                                                yield Message::Chunk(output_chunk);
550                                            }
551                                            None => continue,
552                                        }
553                                    }
554                                    ConflictBehavior::NoCheck => {
555                                        self.state_table.write_chunk(chunk.clone());
556                                        self.state_table.try_flush().await?;
557
558                                        // For refreshable materialized views, also write to staging table during refresh
559                                        if let Some(ref mut refresh_args) = self.refresh_args
560                                            && refresh_args.is_refreshing
561                                        {
562                                            let key_chunk = chunk
563                                                .clone()
564                                                .project(self.state_table.pk_indices());
565                                            tracing::trace!(
566                                                staging_chunk = %key_chunk.to_pretty(),
567                                                input_chunk = %chunk.to_pretty(),
568                                                "writing to staging table"
569                                            );
570                                            if cfg!(debug_assertions) {
571                                                // refreshable source should be append-only
572                                                assert!(
573                                                    key_chunk
574                                                        .ops()
575                                                        .iter()
576                                                        .all(|op| op == &Op::Insert)
577                                                );
578                                            }
579                                            refresh_args
580                                                .staging_table
581                                                .write_chunk(key_chunk.clone());
582                                            refresh_args.staging_table.try_flush().await?;
583                                        }
584
585                                        yield Message::Chunk(chunk);
586                                    }
587                                }
588                            }
589                            Message::Barrier(barrier) => {
590                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
591                                    barrier,
592                                    expect_next_state: Box::new(
593                                        MaterializeStreamState::NormalIngestion,
594                                    ),
595                                };
596                                continue 'main_loop;
597                            }
598                        }
599                    }
600
601                    return Err(StreamExecutorError::from(ErrorKind::Uncategorized(
602                        anyhow::anyhow!(
603                            "Input stream terminated unexpectedly during normal ingestion"
604                        ),
605                    )));
606                }
607                MaterializeStreamState::MergingData => {
608                    let Some(refresh_args) = self.refresh_args.as_mut() else {
609                        panic!(
610                            "MaterializeExecutor entered CleanUp state without refresh_args configured"
611                        );
612                    };
613                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: Starting table replacement operation");
614
615                    debug_assert_eq!(
616                        self.state_table.vnodes(),
617                        refresh_args.staging_table.vnodes()
618                    );
619                    debug_assert_eq!(
620                        refresh_args.staging_table.vnodes(),
621                        refresh_args.progress_table.vnodes()
622                    );
623
624                    let mut rows_to_delete = vec![];
625                    let mut merge_complete = false;
626                    let mut pending_barrier: Option<Barrier> = None;
627
628                    // Scope to limit immutable borrows to state tables
629                    {
630                        let left_input = input.by_ref().map(Either::Left);
631                        let right_merge_sort = pin!(
632                            Self::make_mergesort_stream(
633                                &self.state_table,
634                                &refresh_args.staging_table,
635                                &mut refresh_args.progress_table
636                            )
637                            .map(Either::Right)
638                        );
639
640                        // Prefer to select input stream to handle barriers promptly
641                        // Rebuild the merge stream each time processing a barrier
642                        let mut merge_stream =
643                            select_with_strategy(left_input, right_merge_sort, |_: &mut ()| {
644                                stream::PollNext::Left
645                            });
646
647                        #[for_await]
648                        'merge_stream: for either in &mut merge_stream {
649                            match either {
650                                Either::Left(msg) => {
651                                    let msg = msg?;
652                                    match msg {
653                                        Message::Watermark(w) => yield Message::Watermark(w),
654                                        Message::Chunk(chunk) => {
655                                            tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
656                                        }
657                                        Message::Barrier(b) => {
658                                            pending_barrier = Some(b);
659                                            break 'merge_stream;
660                                        }
661                                    }
662                                }
663                                Either::Right(result) => {
664                                    match result? {
665                                        Some((_vnode, row)) => {
666                                            rows_to_delete.push(row);
667                                        }
668                                        None => {
669                                            // Merge stream finished
670                                            merge_complete = true;
671
672                                            // If the merge stream finished, we need to wait for the next barrier to commit states
673                                        }
674                                    }
675                                }
676                            }
677                        }
678                    }
679
680                    // Process collected rows for deletion
681                    for row in &rows_to_delete {
682                        self.state_table.delete(row);
683                    }
684                    if !rows_to_delete.is_empty() {
685                        let to_delete_chunk = StreamChunk::from_rows(
686                            &rows_to_delete
687                                .iter()
688                                .map(|row| (Op::Delete, row))
689                                .collect_vec(),
690                            &self.schema.data_types(),
691                        );
692
693                        yield Message::Chunk(to_delete_chunk);
694                    }
695
696                    // should wait for at least one barrier
697                    assert!(pending_barrier.is_some(), "pending barrier is not set");
698
699                    *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
700                        barrier: pending_barrier.unwrap(),
701                        expect_next_state: if merge_complete {
702                            Box::new(MaterializeStreamState::CleanUp)
703                        } else {
704                            Box::new(MaterializeStreamState::MergingData)
705                        },
706                    };
707                    continue 'main_loop;
708                }
709                MaterializeStreamState::CleanUp => {
710                    let Some(refresh_args) = self.refresh_args.as_mut() else {
711                        panic!(
712                            "MaterializeExecutor entered MergingData state without refresh_args configured"
713                        );
714                    };
715                    tracing::info!(table_id = %refresh_args.table_id, "on_load_finish: resuming CleanUp Stage");
716
717                    #[for_await]
718                    for msg in input.by_ref() {
719                        let msg = msg?;
720                        match msg {
721                            Message::Watermark(w) => {
722                                if self.cleaned_by_ttl_watermark
723                                    && self.state_table.clean_watermark_index == Some(w.col_idx)
724                                {
725                                    self.state_table.update_watermark(w.val.clone());
726                                }
727                                yield Message::Watermark(w)
728                            }
729                            Message::Chunk(chunk) => {
730                                tracing::warn!(chunk = %chunk.to_pretty(), "chunk is ignored during merge phase");
731                            }
732                            Message::Barrier(barrier) if !barrier.is_checkpoint() => {
733                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
734                                    barrier,
735                                    expect_next_state: Box::new(MaterializeStreamState::CleanUp),
736                                };
737                                continue 'main_loop;
738                            }
739                            Message::Barrier(barrier) => {
740                                let staging_table_id = refresh_args.staging_table.table_id();
741                                let epoch = barrier.epoch;
742                                self.local_barrier_manager.report_refresh_finished(
743                                    epoch,
744                                    self.actor_context.id,
745                                    refresh_args.table_id,
746                                    staging_table_id,
747                                );
748                                tracing::debug!(table_id = %refresh_args.table_id, "on_load_finish: Reported staging table truncation and diff applied");
749
750                                *inner_state = MaterializeStreamState::CommitAndYieldBarrier {
751                                    barrier,
752                                    expect_next_state: Box::new(
753                                        MaterializeStreamState::RefreshEnd {
754                                            on_complete_epoch: epoch,
755                                        },
756                                    ),
757                                };
758                                continue 'main_loop;
759                            }
760                        }
761                    }
762                }
763                MaterializeStreamState::RefreshEnd { on_complete_epoch } => {
764                    let Some(refresh_args) = self.refresh_args.as_mut() else {
765                        panic!(
766                            "MaterializeExecutor entered RefreshEnd state without refresh_args configured"
767                        );
768                    };
769                    let staging_table_id = refresh_args.staging_table.table_id();
770
771                    // Wait for staging table truncation to complete
772                    let staging_store = refresh_args.staging_table.state_store().clone();
773                    staging_store
774                        .try_wait_epoch(
775                            HummockReadEpoch::Committed(on_complete_epoch.prev),
776                            TryWaitEpochOptions {
777                                table_id: staging_table_id,
778                            },
779                        )
780                        .await?;
781
782                    tracing::info!(table_id = %refresh_args.table_id, "RefreshEnd: Refresh completed");
783
784                    if let Some(ref mut refresh_args) = self.refresh_args {
785                        refresh_args.is_refreshing = false;
786                    }
787                    *inner_state = MaterializeStreamState::NormalIngestion;
788                    continue 'main_loop;
789                }
790                MaterializeStreamState::CommitAndYieldBarrier {
791                    barrier,
792                    mut expect_next_state,
793                } => {
794                    if let Some(ref mut refresh_args) = self.refresh_args {
795                        match barrier.mutation.as_deref() {
796                            Some(Mutation::RefreshStart {
797                                table_id: refresh_table_id,
798                                associated_source_id: _,
799                            }) if *refresh_table_id == refresh_args.table_id => {
800                                debug_assert!(
801                                    !refresh_args.is_refreshing,
802                                    "cannot start refresh twice"
803                                );
804                                refresh_args.is_refreshing = true;
805                                tracing::info!(table_id = %refresh_table_id, "RefreshStart barrier received");
806
807                                // Initialize progress tracking for all VNodes
808                                Self::init_refresh_progress(
809                                    &self.state_table,
810                                    &mut refresh_args.progress_table,
811                                    barrier.epoch.curr,
812                                )?;
813                            }
814                            Some(Mutation::LoadFinish {
815                                associated_source_id: load_finish_source_id,
816                            }) => {
817                                // Get associated source id from table catalog
818                                let associated_source_id: SourceId = match refresh_args
819                                    .table_catalog
820                                    .optional_associated_source_id
821                                {
822                                    Some(id) => id.into(),
823                                    None => unreachable!("associated_source_id is not set"),
824                                };
825
826                                if *load_finish_source_id == associated_source_id {
827                                    tracing::info!(
828                                        %load_finish_source_id,
829                                        "LoadFinish received, starting data replacement"
830                                    );
831                                    expect_next_state =
832                                        Box::new(MaterializeStreamState::<_>::MergingData);
833                                }
834                            }
835                            _ => {}
836                        }
837                    }
838
839                    // ===== normal operation =====
840
841                    // If a downstream mv depends on the current table, we need to do conflict check again.
842                    if !self.may_have_downstream
843                        && barrier.has_more_downstream_fragments(self.actor_context.id)
844                    {
845                        self.may_have_downstream = true;
846                    }
847                    Self::may_update_depended_subscriptions(
848                        &mut self.subscriber_ids,
849                        &barrier,
850                        mv_table_id,
851                    );
852                    let op_consistency_level = get_op_consistency_level(
853                        self.cleaned_by_ttl_watermark,
854                        self.conflict_behavior,
855                        self.may_have_downstream,
856                        &self.subscriber_ids,
857                    );
858                    let post_commit = self
859                        .state_table
860                        .commit_may_switch_consistent_op(barrier.epoch, op_consistency_level)
861                        .await?;
862
863                    let update_vnode_bitmap = barrier.as_update_vnode_bitmap(self.actor_context.id);
864
865                    // Commit staging table for refreshable materialized views
866                    let refresh_post_commit = if let Some(ref mut refresh_args) = self.refresh_args
867                    {
868                        // Commit progress table for fault tolerance
869
870                        Some((
871                            refresh_args.staging_table.commit(barrier.epoch).await?,
872                            refresh_args.progress_table.commit(barrier.epoch).await?,
873                        ))
874                    } else {
875                        None
876                    };
877
878                    let b_epoch = barrier.epoch;
879                    yield Message::Barrier(barrier);
880
881                    // Update the vnode bitmap for the state table if asked.
882                    if let Some((_, cache_may_stale)) = post_commit
883                        .post_yield_barrier(update_vnode_bitmap.clone())
884                        .await?
885                        && cache_may_stale
886                        && let Some(cache) = &mut self.materialize_cache
887                    {
888                        cache.clear();
889                    }
890
891                    // Handle staging table post commit
892                    if let Some((staging_post_commit, progress_post_commit)) = refresh_post_commit {
893                        staging_post_commit
894                            .post_yield_barrier(update_vnode_bitmap.clone())
895                            .await?;
896                        progress_post_commit
897                            .post_yield_barrier(update_vnode_bitmap)
898                            .await?;
899                    }
900
901                    self.metrics
902                        .materialize_current_epoch
903                        .set(b_epoch.curr as i64);
904
905                    // ====== transition to next state ======
906
907                    *inner_state = *expect_next_state;
908                }
909            }
910        }
911    }
912
913    /// Stream that yields rows to be deleted from main table.
914    /// Yields `Some((vnode, row))` for rows that exist in main but not in staging.
915    /// Yields `None` when finished processing all vnodes.
916    #[try_stream(ok = Option<(VirtualNode, OwnedRow)>, error = StreamExecutorError)]
917    async fn make_mergesort_stream<'a>(
918        main_table: &'a StateTableInner<S, SD>,
919        staging_table: &'a StateTableInner<S, SD>,
920        progress_table: &'a mut RefreshProgressTable<S>,
921    ) {
922        for vnode in main_table.vnodes().clone().iter_vnodes() {
923            let mut processed_rows = 0;
924            // Check if this VNode has already been completed (for fault tolerance)
925            let pk_range: (Bound<OwnedRow>, Bound<OwnedRow>) =
926                if let Some(current_entry) = progress_table.get_progress(vnode) {
927                    // Skip already completed VNodes during recovery
928                    if current_entry.is_completed {
929                        tracing::debug!(
930                            vnode = vnode.to_index(),
931                            "Skipping already completed VNode during recovery"
932                        );
933                        continue;
934                    }
935                    processed_rows += current_entry.processed_rows;
936                    tracing::debug!(vnode = vnode.to_index(), "Started merging VNode");
937
938                    if let Some(current_state) = &current_entry.current_pos {
939                        (Bound::Excluded(current_state.clone()), Bound::Unbounded)
940                    } else {
941                        (Bound::Unbounded, Bound::Unbounded)
942                    }
943                } else {
944                    (Bound::Unbounded, Bound::Unbounded)
945                };
946
947            let iter_main = main_table
948                .iter_keyed_row_with_vnode(
949                    vnode,
950                    &pk_range,
951                    PrefetchOptions::prefetch_for_large_range_scan(),
952                )
953                .await?;
954            let iter_staging = staging_table
955                .iter_keyed_row_with_vnode(
956                    vnode,
957                    &pk_range,
958                    PrefetchOptions::prefetch_for_large_range_scan(),
959                )
960                .await?;
961
962            pin_mut!(iter_main);
963            pin_mut!(iter_staging);
964
965            // Sort-merge join implementation using dual pointers
966            let mut main_item: Option<KeyedRow<Bytes>> = iter_main.next().await.transpose()?;
967            let mut staging_item: Option<KeyedRow<Bytes>> =
968                iter_staging.next().await.transpose()?;
969
970            while let Some(main_kv) = main_item {
971                let main_key = main_kv.key();
972
973                // Advance staging iterator until we find a key >= main_key
974                let mut should_delete = false;
975                while let Some(staging_kv) = &staging_item {
976                    let staging_key = staging_kv.key();
977                    match main_key.cmp(staging_key) {
978                        std::cmp::Ordering::Greater => {
979                            // main_key > staging_key, advance staging
980                            staging_item = iter_staging.next().await.transpose()?;
981                        }
982                        std::cmp::Ordering::Equal => {
983                            // Keys match, this row exists in both tables, no need to delete
984                            break;
985                        }
986                        std::cmp::Ordering::Less => {
987                            // main_key < staging_key, main row doesn't exist in staging, delete it
988                            should_delete = true;
989                            break;
990                        }
991                    }
992                }
993
994                // If staging_item is None, all remaining main rows should be deleted
995                if staging_item.is_none() {
996                    should_delete = true;
997                }
998
999                if should_delete {
1000                    yield Some((vnode, main_kv.row().clone()));
1001                }
1002
1003                // Advance main iterator
1004                processed_rows += 1;
1005                tracing::debug!(
1006                    "set progress table: vnode = {:?}, processed_rows = {:?}",
1007                    vnode,
1008                    processed_rows
1009                );
1010                progress_table.set_progress(
1011                    vnode,
1012                    Some(
1013                        main_kv
1014                            .row()
1015                            .project(main_table.pk_indices())
1016                            .to_owned_row(),
1017                    ),
1018                    false,
1019                    processed_rows,
1020                )?;
1021                main_item = iter_main.next().await.transpose()?;
1022            }
1023
1024            // Mark this VNode as completed
1025            if let Some(current_entry) = progress_table.get_progress(vnode) {
1026                progress_table.set_progress(
1027                    vnode,
1028                    current_entry.current_pos.clone(),
1029                    true, // completed
1030                    current_entry.processed_rows,
1031                )?;
1032
1033                tracing::debug!(vnode = vnode.to_index(), "Completed merging VNode");
1034            }
1035        }
1036
1037        // Signal completion
1038        yield None;
1039    }
1040
1041    /// return true when changed
1042    fn may_update_depended_subscriptions(
1043        depended_subscriptions: &mut HashSet<SubscriberId>,
1044        barrier: &Barrier,
1045        mv_table_id: TableId,
1046    ) {
1047        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
1048            if !depended_subscriptions.insert(subscriber_id) {
1049                warn!(
1050                    ?depended_subscriptions,
1051                    %mv_table_id,
1052                    %subscriber_id,
1053                    "subscription id already exists"
1054                );
1055            }
1056        }
1057
1058        if let Some(subscriptions_to_drop) = barrier.as_subscriptions_to_drop() {
1059            for SubscriptionUpstreamInfo {
1060                subscriber_id,
1061                upstream_mv_table_id,
1062            } in subscriptions_to_drop
1063            {
1064                if *upstream_mv_table_id == mv_table_id
1065                    && !depended_subscriptions.remove(subscriber_id)
1066                {
1067                    warn!(
1068                        ?depended_subscriptions,
1069                        %mv_table_id,
1070                        %subscriber_id,
1071                        "drop non existing subscriber_id id"
1072                    );
1073                }
1074            }
1075        }
1076    }
1077
1078    /// Initialize refresh progress tracking for all `VNodes`
1079    fn init_refresh_progress(
1080        state_table: &StateTableInner<S, SD>,
1081        progress_table: &mut RefreshProgressTable<S>,
1082        _epoch: u64,
1083    ) -> StreamExecutorResult<()> {
1084        debug_assert_eq!(state_table.vnodes(), progress_table.vnodes());
1085
1086        // Initialize progress for all VNodes in the current bitmap
1087        for vnode in state_table.vnodes().iter_vnodes() {
1088            progress_table.set_progress(
1089                vnode, None,  // initial position
1090                false, // not completed yet
1091                0,     // initial processed rows
1092            )?;
1093        }
1094
1095        tracing::info!(
1096            vnodes_count = state_table.vnodes().count_ones(),
1097            "Initialized refresh progress tracking for all VNodes"
1098        );
1099
1100        Ok(())
1101    }
1102}
1103
1104impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
1105    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
1106    #[cfg(any(test, feature = "test"))]
1107    pub async fn for_test(
1108        input: Executor,
1109        store: S,
1110        table_id: TableId,
1111        keys: Vec<ColumnOrder>,
1112        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1113        watermark_epoch: AtomicU64Ref,
1114        conflict_behavior: ConflictBehavior,
1115    ) -> Self {
1116        Self::for_test_inner(
1117            input,
1118            store,
1119            table_id,
1120            keys,
1121            column_ids,
1122            watermark_epoch,
1123            conflict_behavior,
1124            None,
1125        )
1126        .await
1127    }
1128
1129    #[cfg(any(test, feature = "test"))]
1130    #[allow(clippy::too_many_arguments)]
1131    pub async fn for_test_with_stream_key(
1132        input: Executor,
1133        store: S,
1134        table_id: TableId,
1135        keys: Vec<ColumnOrder>,
1136        stream_key: Vec<usize>,
1137        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1138        watermark_epoch: AtomicU64Ref,
1139        conflict_behavior: ConflictBehavior,
1140    ) -> Self {
1141        Self::for_test_inner(
1142            input,
1143            store,
1144            table_id,
1145            keys,
1146            column_ids,
1147            watermark_epoch,
1148            conflict_behavior,
1149            Some(stream_key),
1150        )
1151        .await
1152    }
1153
1154    #[cfg(any(test, feature = "test"))]
1155    #[allow(clippy::too_many_arguments)]
1156    async fn for_test_inner(
1157        input: Executor,
1158        store: S,
1159        table_id: TableId,
1160        keys: Vec<ColumnOrder>,
1161        column_ids: Vec<risingwave_common::catalog::ColumnId>,
1162        watermark_epoch: AtomicU64Ref,
1163        conflict_behavior: ConflictBehavior,
1164        stream_key: Option<Vec<usize>>,
1165    ) -> Self {
1166        use risingwave_common::util::iter_util::ZipEqFast;
1167
1168        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
1169        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
1170        let schema = input.schema().clone();
1171        let columns: Vec<ColumnDesc> = column_ids
1172            .into_iter()
1173            .zip_eq_fast(schema.fields.iter())
1174            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
1175            .collect_vec();
1176
1177        let row_serde = BasicSerde::new(
1178            Arc::from((0..columns.len()).collect_vec()),
1179            Arc::from(columns.clone().into_boxed_slice()),
1180        );
1181        let stream_key_indices = stream_key.unwrap_or_else(|| arrange_columns.clone());
1182        let mut table_catalog = crate::common::table::test_utils::gen_pbtable(
1183            table_id,
1184            columns,
1185            arrange_order_types,
1186            arrange_columns.clone(),
1187            0,
1188        );
1189        table_catalog.stream_key = stream_key_indices.iter().map(|i| *i as i32).collect();
1190        let state_table = StateTableInner::from_table_catalog(&table_catalog, store, None).await;
1191
1192        let unused = StreamingMetrics::unused();
1193        let metrics = unused.new_materialize_metrics(table_id, 1.into(), 2.into());
1194        let cache_metrics = unused.new_materialize_cache_metrics(table_id, 1.into(), 2.into());
1195
1196        Self {
1197            input,
1198            schema,
1199            state_table,
1200            stream_key_indices,
1201            arrange_key_indices: arrange_columns.clone(),
1202            actor_context: ActorContext::for_test(0),
1203            materialize_cache: MaterializeCache::new(
1204                watermark_epoch,
1205                MetricsInfo::for_test(),
1206                row_serde,
1207                vec![],
1208                conflict_behavior,
1209                None,
1210                cache_metrics,
1211            ),
1212            conflict_behavior,
1213            cleaned_by_ttl_watermark: false,
1214            version_column_indices: vec![],
1215            is_dummy_table: false,
1216            may_have_downstream: true,
1217            subscriber_ids: HashSet::new(),
1218            metrics,
1219            refresh_args: None, // Test constructor doesn't support refresh functionality
1220            local_barrier_manager: LocalBarrierManager::for_test(),
1221        }
1222    }
1223}
1224
1225impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
1226    fn execute(self: Box<Self>) -> BoxedMessageStream {
1227        self.execute_inner().boxed()
1228    }
1229}
1230
1231impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
1232    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1233        f.debug_struct("MaterializeExecutor")
1234            .field("arrange_key_indices", &self.arrange_key_indices)
1235            .field("stream_key_indices", &self.stream_key_indices)
1236            .finish()
1237    }
1238}
1239
1240#[cfg(test)]
1241mod tests {
1242
1243    use std::iter;
1244    use std::sync::atomic::AtomicU64;
1245
1246    use rand::rngs::SmallRng;
1247    use rand::{Rng, RngCore, SeedableRng};
1248    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
1249    use risingwave_common::catalog::Field;
1250    use risingwave_common::util::epoch::test_epoch;
1251    use risingwave_common::util::sort_util::OrderType;
1252    use risingwave_hummock_sdk::HummockReadEpoch;
1253    use risingwave_storage::memory::MemoryStateStore;
1254    use risingwave_storage::table::batch_table::BatchTable;
1255
1256    use super::*;
1257    use crate::executor::test_utils::*;
1258
1259    #[tokio::test]
1260    async fn test_materialize_executor() {
1261        // Prepare storage and memtable.
1262        let memory_state_store = MemoryStateStore::new();
1263        let table_id = TableId::new(1);
1264        // Two columns of int32 type, the first column is PK.
1265        let schema = Schema::new(vec![
1266            Field::unnamed(DataType::Int32),
1267            Field::unnamed(DataType::Int32),
1268        ]);
1269        let column_ids = vec![0.into(), 1.into()];
1270
1271        // Prepare source chunks.
1272        let chunk1 = StreamChunk::from_pretty(
1273            " i i
1274            + 1 4
1275            + 2 5
1276            + 3 6",
1277        );
1278        let chunk2 = StreamChunk::from_pretty(
1279            " i i
1280            + 7 8
1281            - 3 6",
1282        );
1283
1284        // Prepare stream executors.
1285        let source = MockSource::with_messages(vec![
1286            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1287            Message::Chunk(chunk1),
1288            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1289            Message::Chunk(chunk2),
1290            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1291        ])
1292        .into_executor(schema.clone(), StreamKey::new());
1293
1294        let order_types = vec![OrderType::ascending()];
1295        let column_descs = vec![
1296            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1297            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1298        ];
1299
1300        let table = BatchTable::for_test(
1301            memory_state_store.clone(),
1302            table_id,
1303            column_descs,
1304            order_types,
1305            vec![0],
1306            vec![0, 1],
1307        );
1308
1309        let mut materialize_executor = MaterializeExecutor::for_test(
1310            source,
1311            memory_state_store,
1312            table_id,
1313            vec![ColumnOrder::new(0, OrderType::ascending())],
1314            column_ids,
1315            Arc::new(AtomicU64::new(0)),
1316            ConflictBehavior::NoCheck,
1317        )
1318        .await
1319        .boxed()
1320        .execute();
1321        materialize_executor.next().await.transpose().unwrap();
1322
1323        materialize_executor.next().await.transpose().unwrap();
1324
1325        // First stream chunk. We check the existence of (3) -> (3,6)
1326        match materialize_executor.next().await.transpose().unwrap() {
1327            Some(Message::Barrier(_)) => {
1328                let row = table
1329                    .get_row(
1330                        &OwnedRow::new(vec![Some(3_i32.into())]),
1331                        HummockReadEpoch::NoWait(u64::MAX),
1332                    )
1333                    .await
1334                    .unwrap();
1335                assert_eq!(
1336                    row,
1337                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1338                );
1339            }
1340            _ => unreachable!(),
1341        }
1342        materialize_executor.next().await.transpose().unwrap();
1343        // Second stream chunk. We check the existence of (7) -> (7,8)
1344        match materialize_executor.next().await.transpose().unwrap() {
1345            Some(Message::Barrier(_)) => {
1346                let row = table
1347                    .get_row(
1348                        &OwnedRow::new(vec![Some(7_i32.into())]),
1349                        HummockReadEpoch::NoWait(u64::MAX),
1350                    )
1351                    .await
1352                    .unwrap();
1353                assert_eq!(
1354                    row,
1355                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1356                );
1357            }
1358            _ => unreachable!(),
1359        }
1360    }
1361
1362    // https://github.com/risingwavelabs/risingwave/issues/13346
1363    #[tokio::test]
1364    async fn test_upsert_stream() {
1365        // Prepare storage and memtable.
1366        let memory_state_store = MemoryStateStore::new();
1367        let table_id = TableId::new(1);
1368        // Two columns of int32 type, the first column is PK.
1369        let schema = Schema::new(vec![
1370            Field::unnamed(DataType::Int32),
1371            Field::unnamed(DataType::Int32),
1372        ]);
1373        let column_ids = vec![0.into(), 1.into()];
1374
1375        // test double insert one pk, the latter needs to override the former.
1376        let chunk1 = StreamChunk::from_pretty(
1377            " i i
1378            + 1 1",
1379        );
1380
1381        let chunk2 = StreamChunk::from_pretty(
1382            " i i
1383            + 1 2
1384            - 1 2",
1385        );
1386
1387        // Prepare stream executors.
1388        let source = MockSource::with_messages(vec![
1389            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1390            Message::Chunk(chunk1),
1391            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1392            Message::Chunk(chunk2),
1393            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1394        ])
1395        .into_executor(schema.clone(), StreamKey::new());
1396
1397        let order_types = vec![OrderType::ascending()];
1398        let column_descs = vec![
1399            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1400            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1401        ];
1402
1403        let table = BatchTable::for_test(
1404            memory_state_store.clone(),
1405            table_id,
1406            column_descs,
1407            order_types,
1408            vec![0],
1409            vec![0, 1],
1410        );
1411
1412        let mut materialize_executor = MaterializeExecutor::for_test(
1413            source,
1414            memory_state_store,
1415            table_id,
1416            vec![ColumnOrder::new(0, OrderType::ascending())],
1417            column_ids,
1418            Arc::new(AtomicU64::new(0)),
1419            ConflictBehavior::Overwrite,
1420        )
1421        .await
1422        .boxed()
1423        .execute();
1424        materialize_executor.next().await.transpose().unwrap();
1425
1426        materialize_executor.next().await.transpose().unwrap();
1427        materialize_executor.next().await.transpose().unwrap();
1428        materialize_executor.next().await.transpose().unwrap();
1429
1430        match materialize_executor.next().await.transpose().unwrap() {
1431            Some(Message::Barrier(_)) => {
1432                let row = table
1433                    .get_row(
1434                        &OwnedRow::new(vec![Some(1_i32.into())]),
1435                        HummockReadEpoch::NoWait(u64::MAX),
1436                    )
1437                    .await
1438                    .unwrap();
1439                assert!(row.is_none());
1440            }
1441            _ => unreachable!(),
1442        }
1443    }
1444
1445    #[tokio::test]
1446    async fn test_check_insert_conflict() {
1447        // Prepare storage and memtable.
1448        let memory_state_store = MemoryStateStore::new();
1449        let table_id = TableId::new(1);
1450        // Two columns of int32 type, the first column is PK.
1451        let schema = Schema::new(vec![
1452            Field::unnamed(DataType::Int32),
1453            Field::unnamed(DataType::Int32),
1454        ]);
1455        let column_ids = vec![0.into(), 1.into()];
1456
1457        // test double insert one pk, the latter needs to override the former.
1458        let chunk1 = StreamChunk::from_pretty(
1459            " i i
1460            + 1 3
1461            + 1 4
1462            + 2 5
1463            + 3 6",
1464        );
1465
1466        let chunk2 = StreamChunk::from_pretty(
1467            " i i
1468            + 1 3
1469            + 2 6",
1470        );
1471
1472        // test delete wrong value, delete inexistent pk
1473        let chunk3 = StreamChunk::from_pretty(
1474            " i i
1475            + 1 4",
1476        );
1477
1478        // Prepare stream executors.
1479        let source = MockSource::with_messages(vec![
1480            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1481            Message::Chunk(chunk1),
1482            Message::Chunk(chunk2),
1483            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1484            Message::Chunk(chunk3),
1485            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1486        ])
1487        .into_executor(schema.clone(), StreamKey::new());
1488
1489        let order_types = vec![OrderType::ascending()];
1490        let column_descs = vec![
1491            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1492            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1493        ];
1494
1495        let table = BatchTable::for_test(
1496            memory_state_store.clone(),
1497            table_id,
1498            column_descs,
1499            order_types,
1500            vec![0],
1501            vec![0, 1],
1502        );
1503
1504        let mut materialize_executor = MaterializeExecutor::for_test(
1505            source,
1506            memory_state_store,
1507            table_id,
1508            vec![ColumnOrder::new(0, OrderType::ascending())],
1509            column_ids,
1510            Arc::new(AtomicU64::new(0)),
1511            ConflictBehavior::Overwrite,
1512        )
1513        .await
1514        .boxed()
1515        .execute();
1516        materialize_executor.next().await.transpose().unwrap();
1517
1518        materialize_executor.next().await.transpose().unwrap();
1519        materialize_executor.next().await.transpose().unwrap();
1520
1521        // First stream chunk. We check the existence of (3) -> (3,6)
1522        match materialize_executor.next().await.transpose().unwrap() {
1523            Some(Message::Barrier(_)) => {
1524                let row = table
1525                    .get_row(
1526                        &OwnedRow::new(vec![Some(3_i32.into())]),
1527                        HummockReadEpoch::NoWait(u64::MAX),
1528                    )
1529                    .await
1530                    .unwrap();
1531                assert_eq!(
1532                    row,
1533                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1534                );
1535
1536                let row = table
1537                    .get_row(
1538                        &OwnedRow::new(vec![Some(1_i32.into())]),
1539                        HummockReadEpoch::NoWait(u64::MAX),
1540                    )
1541                    .await
1542                    .unwrap();
1543                assert_eq!(
1544                    row,
1545                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1546                );
1547
1548                let row = table
1549                    .get_row(
1550                        &OwnedRow::new(vec![Some(2_i32.into())]),
1551                        HummockReadEpoch::NoWait(u64::MAX),
1552                    )
1553                    .await
1554                    .unwrap();
1555                assert_eq!(
1556                    row,
1557                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1558                );
1559            }
1560            _ => unreachable!(),
1561        }
1562    }
1563
1564    #[tokio::test]
1565    async fn test_delete_and_update_conflict() {
1566        // Prepare storage and memtable.
1567        let memory_state_store = MemoryStateStore::new();
1568        let table_id = TableId::new(1);
1569        // Two columns of int32 type, the first column is PK.
1570        let schema = Schema::new(vec![
1571            Field::unnamed(DataType::Int32),
1572            Field::unnamed(DataType::Int32),
1573        ]);
1574        let column_ids = vec![0.into(), 1.into()];
1575
1576        // test double insert one pk, the latter needs to override the former.
1577        let chunk1 = StreamChunk::from_pretty(
1578            " i i
1579            + 1 4
1580            + 2 5
1581            + 3 6
1582            U- 8 1
1583            U+ 8 2
1584            + 8 3",
1585        );
1586
1587        // test delete wrong value, delete inexistent pk
1588        let chunk2 = StreamChunk::from_pretty(
1589            " i i
1590            + 7 8
1591            - 3 4
1592            - 5 0",
1593        );
1594
1595        // test delete wrong value, delete inexistent pk
1596        let chunk3 = StreamChunk::from_pretty(
1597            " i i
1598            + 1 5
1599            U- 2 4
1600            U+ 2 8
1601            U- 9 0
1602            U+ 9 1",
1603        );
1604
1605        // Prepare stream executors.
1606        let source = MockSource::with_messages(vec![
1607            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1608            Message::Chunk(chunk1),
1609            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1610            Message::Chunk(chunk2),
1611            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1612            Message::Chunk(chunk3),
1613            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1614        ])
1615        .into_executor(schema.clone(), StreamKey::new());
1616
1617        let order_types = vec![OrderType::ascending()];
1618        let column_descs = vec![
1619            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1620            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1621        ];
1622
1623        let table = BatchTable::for_test(
1624            memory_state_store.clone(),
1625            table_id,
1626            column_descs,
1627            order_types,
1628            vec![0],
1629            vec![0, 1],
1630        );
1631
1632        let mut materialize_executor = MaterializeExecutor::for_test(
1633            source,
1634            memory_state_store,
1635            table_id,
1636            vec![ColumnOrder::new(0, OrderType::ascending())],
1637            column_ids,
1638            Arc::new(AtomicU64::new(0)),
1639            ConflictBehavior::Overwrite,
1640        )
1641        .await
1642        .boxed()
1643        .execute();
1644        materialize_executor.next().await.transpose().unwrap();
1645
1646        materialize_executor.next().await.transpose().unwrap();
1647
1648        // First stream chunk. We check the existence of (3) -> (3,6)
1649        match materialize_executor.next().await.transpose().unwrap() {
1650            Some(Message::Barrier(_)) => {
1651                // can read (8, 3), check insert after update
1652                let row = table
1653                    .get_row(
1654                        &OwnedRow::new(vec![Some(8_i32.into())]),
1655                        HummockReadEpoch::NoWait(u64::MAX),
1656                    )
1657                    .await
1658                    .unwrap();
1659                assert_eq!(
1660                    row,
1661                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1662                );
1663            }
1664            _ => unreachable!(),
1665        }
1666        materialize_executor.next().await.transpose().unwrap();
1667
1668        match materialize_executor.next().await.transpose().unwrap() {
1669            Some(Message::Barrier(_)) => {
1670                let row = table
1671                    .get_row(
1672                        &OwnedRow::new(vec![Some(7_i32.into())]),
1673                        HummockReadEpoch::NoWait(u64::MAX),
1674                    )
1675                    .await
1676                    .unwrap();
1677                assert_eq!(
1678                    row,
1679                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1680                );
1681
1682                // check delete wrong value
1683                let row = table
1684                    .get_row(
1685                        &OwnedRow::new(vec![Some(3_i32.into())]),
1686                        HummockReadEpoch::NoWait(u64::MAX),
1687                    )
1688                    .await
1689                    .unwrap();
1690                assert_eq!(row, None);
1691
1692                // check delete wrong pk
1693                let row = table
1694                    .get_row(
1695                        &OwnedRow::new(vec![Some(5_i32.into())]),
1696                        HummockReadEpoch::NoWait(u64::MAX),
1697                    )
1698                    .await
1699                    .unwrap();
1700                assert_eq!(row, None);
1701            }
1702            _ => unreachable!(),
1703        }
1704
1705        materialize_executor.next().await.transpose().unwrap();
1706        // Second stream chunk. We check the existence of (7) -> (7,8)
1707        match materialize_executor.next().await.transpose().unwrap() {
1708            Some(Message::Barrier(_)) => {
1709                let row = table
1710                    .get_row(
1711                        &OwnedRow::new(vec![Some(1_i32.into())]),
1712                        HummockReadEpoch::NoWait(u64::MAX),
1713                    )
1714                    .await
1715                    .unwrap();
1716                assert_eq!(
1717                    row,
1718                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1719                );
1720
1721                // check update wrong value
1722                let row = table
1723                    .get_row(
1724                        &OwnedRow::new(vec![Some(2_i32.into())]),
1725                        HummockReadEpoch::NoWait(u64::MAX),
1726                    )
1727                    .await
1728                    .unwrap();
1729                assert_eq!(
1730                    row,
1731                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1732                );
1733
1734                // check update wrong pk, should become insert
1735                let row = table
1736                    .get_row(
1737                        &OwnedRow::new(vec![Some(9_i32.into())]),
1738                        HummockReadEpoch::NoWait(u64::MAX),
1739                    )
1740                    .await
1741                    .unwrap();
1742                assert_eq!(
1743                    row,
1744                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1745                );
1746            }
1747            _ => unreachable!(),
1748        }
1749    }
1750
1751    #[tokio::test]
1752    async fn test_change_buffer_into_chunk_with_stream_key() {
1753        // Table PK is (col0), but stream key is (col0, col1). When the value column changes due
1754        // to overwrite conflict handling, we should output `- old` + `+ new` rather than `U-`/`U+`.
1755        let memory_state_store = MemoryStateStore::new();
1756        let table_id = TableId::new(1);
1757        let schema = Schema::new(vec![
1758            Field::unnamed(DataType::Int32),
1759            Field::unnamed(DataType::Int32),
1760        ]);
1761        let column_ids = vec![0.into(), 1.into()];
1762
1763        let chunk1 = StreamChunk::from_pretty(
1764            " i i
1765            + 1 4",
1766        );
1767        let chunk2 = StreamChunk::from_pretty(
1768            " i i
1769            + 1 5",
1770        );
1771
1772        let source = MockSource::with_messages(vec![
1773            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1774            Message::Chunk(chunk1),
1775            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1776            Message::Chunk(chunk2),
1777            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1778        ])
1779        .into_executor(schema.clone(), StreamKey::new());
1780
1781        let mut materialize_executor = MaterializeExecutor::for_test_with_stream_key(
1782            source,
1783            memory_state_store,
1784            table_id,
1785            vec![ColumnOrder::new(0, OrderType::ascending())],
1786            vec![0, 1],
1787            column_ids,
1788            Arc::new(AtomicU64::new(0)),
1789            ConflictBehavior::Overwrite,
1790        )
1791        .await
1792        .boxed()
1793        .execute();
1794
1795        // init barrier + first insert
1796        materialize_executor.next().await.transpose().unwrap();
1797        materialize_executor.next().await.transpose().unwrap();
1798
1799        // commit barrier
1800        materialize_executor.next().await.transpose().unwrap();
1801
1802        // overwrite conflict should be converted into delete+insert due to stream key mismatch
1803        match materialize_executor.next().await.transpose().unwrap() {
1804            Some(Message::Chunk(chunk)) => {
1805                assert_eq!(
1806                    chunk.compact_vis(),
1807                    StreamChunk::from_pretty(
1808                        " i i
1809                        - 1 4
1810                        + 1 5"
1811                    )
1812                );
1813            }
1814            other => panic!("expect chunk, got {other:?}"),
1815        }
1816    }
1817
1818    #[tokio::test]
1819    async fn test_ignore_insert_conflict() {
1820        // Prepare storage and memtable.
1821        let memory_state_store = MemoryStateStore::new();
1822        let table_id = TableId::new(1);
1823        // Two columns of int32 type, the first column is PK.
1824        let schema = Schema::new(vec![
1825            Field::unnamed(DataType::Int32),
1826            Field::unnamed(DataType::Int32),
1827        ]);
1828        let column_ids = vec![0.into(), 1.into()];
1829
1830        // test double insert one pk, the latter needs to be ignored.
1831        let chunk1 = StreamChunk::from_pretty(
1832            " i i
1833            + 1 3
1834            + 1 4
1835            + 2 5
1836            + 3 6",
1837        );
1838
1839        let chunk2 = StreamChunk::from_pretty(
1840            " i i
1841            + 1 5
1842            + 2 6",
1843        );
1844
1845        // test delete wrong value, delete inexistent pk
1846        let chunk3 = StreamChunk::from_pretty(
1847            " i i
1848            + 1 6",
1849        );
1850
1851        // Prepare stream executors.
1852        let source = MockSource::with_messages(vec![
1853            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1854            Message::Chunk(chunk1),
1855            Message::Chunk(chunk2),
1856            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1857            Message::Chunk(chunk3),
1858            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1859        ])
1860        .into_executor(schema.clone(), StreamKey::new());
1861
1862        let order_types = vec![OrderType::ascending()];
1863        let column_descs = vec![
1864            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1865            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1866        ];
1867
1868        let table = BatchTable::for_test(
1869            memory_state_store.clone(),
1870            table_id,
1871            column_descs,
1872            order_types,
1873            vec![0],
1874            vec![0, 1],
1875        );
1876
1877        let mut materialize_executor = MaterializeExecutor::for_test(
1878            source,
1879            memory_state_store,
1880            table_id,
1881            vec![ColumnOrder::new(0, OrderType::ascending())],
1882            column_ids,
1883            Arc::new(AtomicU64::new(0)),
1884            ConflictBehavior::IgnoreConflict,
1885        )
1886        .await
1887        .boxed()
1888        .execute();
1889        materialize_executor.next().await.transpose().unwrap();
1890
1891        materialize_executor.next().await.transpose().unwrap();
1892        materialize_executor.next().await.transpose().unwrap();
1893
1894        // First stream chunk. We check the existence of (3) -> (3,6)
1895        match materialize_executor.next().await.transpose().unwrap() {
1896            Some(Message::Barrier(_)) => {
1897                let row = table
1898                    .get_row(
1899                        &OwnedRow::new(vec![Some(3_i32.into())]),
1900                        HummockReadEpoch::NoWait(u64::MAX),
1901                    )
1902                    .await
1903                    .unwrap();
1904                assert_eq!(
1905                    row,
1906                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1907                );
1908
1909                let row = table
1910                    .get_row(
1911                        &OwnedRow::new(vec![Some(1_i32.into())]),
1912                        HummockReadEpoch::NoWait(u64::MAX),
1913                    )
1914                    .await
1915                    .unwrap();
1916                assert_eq!(
1917                    row,
1918                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1919                );
1920
1921                let row = table
1922                    .get_row(
1923                        &OwnedRow::new(vec![Some(2_i32.into())]),
1924                        HummockReadEpoch::NoWait(u64::MAX),
1925                    )
1926                    .await
1927                    .unwrap();
1928                assert_eq!(
1929                    row,
1930                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1931                );
1932            }
1933            _ => unreachable!(),
1934        }
1935    }
1936
1937    #[tokio::test]
1938    async fn test_ignore_delete_then_insert() {
1939        // Prepare storage and memtable.
1940        let memory_state_store = MemoryStateStore::new();
1941        let table_id = TableId::new(1);
1942        // Two columns of int32 type, the first column is PK.
1943        let schema = Schema::new(vec![
1944            Field::unnamed(DataType::Int32),
1945            Field::unnamed(DataType::Int32),
1946        ]);
1947        let column_ids = vec![0.into(), 1.into()];
1948
1949        // test insert after delete one pk, the latter insert should succeed.
1950        let chunk1 = StreamChunk::from_pretty(
1951            " i i
1952            + 1 3
1953            - 1 3
1954            + 1 6",
1955        );
1956
1957        // Prepare stream executors.
1958        let source = MockSource::with_messages(vec![
1959            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1960            Message::Chunk(chunk1),
1961            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1962        ])
1963        .into_executor(schema.clone(), StreamKey::new());
1964
1965        let order_types = vec![OrderType::ascending()];
1966        let column_descs = vec![
1967            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1968            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1969        ];
1970
1971        let table = BatchTable::for_test(
1972            memory_state_store.clone(),
1973            table_id,
1974            column_descs,
1975            order_types,
1976            vec![0],
1977            vec![0, 1],
1978        );
1979
1980        let mut materialize_executor = MaterializeExecutor::for_test(
1981            source,
1982            memory_state_store,
1983            table_id,
1984            vec![ColumnOrder::new(0, OrderType::ascending())],
1985            column_ids,
1986            Arc::new(AtomicU64::new(0)),
1987            ConflictBehavior::IgnoreConflict,
1988        )
1989        .await
1990        .boxed()
1991        .execute();
1992        let _msg1 = materialize_executor
1993            .next()
1994            .await
1995            .transpose()
1996            .unwrap()
1997            .unwrap()
1998            .as_barrier()
1999            .unwrap();
2000        let _msg2 = materialize_executor
2001            .next()
2002            .await
2003            .transpose()
2004            .unwrap()
2005            .unwrap()
2006            .as_chunk()
2007            .unwrap();
2008        let _msg3 = materialize_executor
2009            .next()
2010            .await
2011            .transpose()
2012            .unwrap()
2013            .unwrap()
2014            .as_barrier()
2015            .unwrap();
2016
2017        let row = table
2018            .get_row(
2019                &OwnedRow::new(vec![Some(1_i32.into())]),
2020                HummockReadEpoch::NoWait(u64::MAX),
2021            )
2022            .await
2023            .unwrap();
2024        assert_eq!(
2025            row,
2026            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
2027        );
2028    }
2029
2030    #[tokio::test]
2031    async fn test_ignore_delete_and_update_conflict() {
2032        // Prepare storage and memtable.
2033        let memory_state_store = MemoryStateStore::new();
2034        let table_id = TableId::new(1);
2035        // Two columns of int32 type, the first column is PK.
2036        let schema = Schema::new(vec![
2037            Field::unnamed(DataType::Int32),
2038            Field::unnamed(DataType::Int32),
2039        ]);
2040        let column_ids = vec![0.into(), 1.into()];
2041
2042        // test double insert one pk, the latter should be ignored.
2043        let chunk1 = StreamChunk::from_pretty(
2044            " i i
2045            + 1 4
2046            + 2 5
2047            + 3 6
2048            U- 8 1
2049            U+ 8 2
2050            + 8 3",
2051        );
2052
2053        // test delete wrong value, delete inexistent pk
2054        let chunk2 = StreamChunk::from_pretty(
2055            " i i
2056            + 7 8
2057            - 3 4
2058            - 5 0",
2059        );
2060
2061        // test delete wrong value, delete inexistent pk
2062        let chunk3 = StreamChunk::from_pretty(
2063            " i i
2064            + 1 5
2065            U- 2 4
2066            U+ 2 8
2067            U- 9 0
2068            U+ 9 1",
2069        );
2070
2071        // Prepare stream executors.
2072        let source = MockSource::with_messages(vec![
2073            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2074            Message::Chunk(chunk1),
2075            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2076            Message::Chunk(chunk2),
2077            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2078            Message::Chunk(chunk3),
2079            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2080        ])
2081        .into_executor(schema.clone(), StreamKey::new());
2082
2083        let order_types = vec![OrderType::ascending()];
2084        let column_descs = vec![
2085            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2086            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2087        ];
2088
2089        let table = BatchTable::for_test(
2090            memory_state_store.clone(),
2091            table_id,
2092            column_descs,
2093            order_types,
2094            vec![0],
2095            vec![0, 1],
2096        );
2097
2098        let mut materialize_executor = MaterializeExecutor::for_test(
2099            source,
2100            memory_state_store,
2101            table_id,
2102            vec![ColumnOrder::new(0, OrderType::ascending())],
2103            column_ids,
2104            Arc::new(AtomicU64::new(0)),
2105            ConflictBehavior::IgnoreConflict,
2106        )
2107        .await
2108        .boxed()
2109        .execute();
2110        materialize_executor.next().await.transpose().unwrap();
2111
2112        materialize_executor.next().await.transpose().unwrap();
2113
2114        // First stream chunk. We check the existence of (3) -> (3,6)
2115        match materialize_executor.next().await.transpose().unwrap() {
2116            Some(Message::Barrier(_)) => {
2117                // can read (8, 2), check insert after update
2118                let row = table
2119                    .get_row(
2120                        &OwnedRow::new(vec![Some(8_i32.into())]),
2121                        HummockReadEpoch::NoWait(u64::MAX),
2122                    )
2123                    .await
2124                    .unwrap();
2125                assert_eq!(
2126                    row,
2127                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2128                );
2129            }
2130            _ => unreachable!(),
2131        }
2132        materialize_executor.next().await.transpose().unwrap();
2133
2134        match materialize_executor.next().await.transpose().unwrap() {
2135            Some(Message::Barrier(_)) => {
2136                let row = table
2137                    .get_row(
2138                        &OwnedRow::new(vec![Some(7_i32.into())]),
2139                        HummockReadEpoch::NoWait(u64::MAX),
2140                    )
2141                    .await
2142                    .unwrap();
2143                assert_eq!(
2144                    row,
2145                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2146                );
2147
2148                // check delete wrong value
2149                let row = table
2150                    .get_row(
2151                        &OwnedRow::new(vec![Some(3_i32.into())]),
2152                        HummockReadEpoch::NoWait(u64::MAX),
2153                    )
2154                    .await
2155                    .unwrap();
2156                assert_eq!(row, None);
2157
2158                // check delete wrong pk
2159                let row = table
2160                    .get_row(
2161                        &OwnedRow::new(vec![Some(5_i32.into())]),
2162                        HummockReadEpoch::NoWait(u64::MAX),
2163                    )
2164                    .await
2165                    .unwrap();
2166                assert_eq!(row, None);
2167            }
2168            _ => unreachable!(),
2169        }
2170
2171        materialize_executor.next().await.transpose().unwrap();
2172        // materialize_executor.next().await.transpose().unwrap();
2173        // Second stream chunk. We check the existence of (7) -> (7,8)
2174        match materialize_executor.next().await.transpose().unwrap() {
2175            Some(Message::Barrier(_)) => {
2176                let row = table
2177                    .get_row(
2178                        &OwnedRow::new(vec![Some(1_i32.into())]),
2179                        HummockReadEpoch::NoWait(u64::MAX),
2180                    )
2181                    .await
2182                    .unwrap();
2183                assert_eq!(
2184                    row,
2185                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
2186                );
2187
2188                // check update wrong value
2189                let row = table
2190                    .get_row(
2191                        &OwnedRow::new(vec![Some(2_i32.into())]),
2192                        HummockReadEpoch::NoWait(u64::MAX),
2193                    )
2194                    .await
2195                    .unwrap();
2196                assert_eq!(
2197                    row,
2198                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
2199                );
2200
2201                // check update wrong pk, should become insert
2202                let row = table
2203                    .get_row(
2204                        &OwnedRow::new(vec![Some(9_i32.into())]),
2205                        HummockReadEpoch::NoWait(u64::MAX),
2206                    )
2207                    .await
2208                    .unwrap();
2209                assert_eq!(
2210                    row,
2211                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2212                );
2213            }
2214            _ => unreachable!(),
2215        }
2216    }
2217
2218    #[tokio::test]
2219    async fn test_do_update_if_not_null_conflict() {
2220        // Prepare storage and memtable.
2221        let memory_state_store = MemoryStateStore::new();
2222        let table_id = TableId::new(1);
2223        // Two columns of int32 type, the first column is PK.
2224        let schema = Schema::new(vec![
2225            Field::unnamed(DataType::Int32),
2226            Field::unnamed(DataType::Int32),
2227        ]);
2228        let column_ids = vec![0.into(), 1.into()];
2229
2230        // should get (8, 2)
2231        let chunk1 = StreamChunk::from_pretty(
2232            " i i
2233            + 1 4
2234            + 2 .
2235            + 3 6
2236            U- 8 .
2237            U+ 8 2
2238            + 8 .",
2239        );
2240
2241        // should not get (3, x), should not get (5, 0)
2242        let chunk2 = StreamChunk::from_pretty(
2243            " i i
2244            + 7 8
2245            - 3 4
2246            - 5 0",
2247        );
2248
2249        // should get (2, None), (7, 8)
2250        let chunk3 = StreamChunk::from_pretty(
2251            " i i
2252            + 1 5
2253            + 7 .
2254            U- 2 4
2255            U+ 2 .
2256            U- 9 0
2257            U+ 9 1",
2258        );
2259
2260        // Prepare stream executors.
2261        let source = MockSource::with_messages(vec![
2262            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
2263            Message::Chunk(chunk1),
2264            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
2265            Message::Chunk(chunk2),
2266            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
2267            Message::Chunk(chunk3),
2268            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
2269        ])
2270        .into_executor(schema.clone(), StreamKey::new());
2271
2272        let order_types = vec![OrderType::ascending()];
2273        let column_descs = vec![
2274            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
2275            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
2276        ];
2277
2278        let table = BatchTable::for_test(
2279            memory_state_store.clone(),
2280            table_id,
2281            column_descs,
2282            order_types,
2283            vec![0],
2284            vec![0, 1],
2285        );
2286
2287        let mut materialize_executor = MaterializeExecutor::for_test(
2288            source,
2289            memory_state_store,
2290            table_id,
2291            vec![ColumnOrder::new(0, OrderType::ascending())],
2292            column_ids,
2293            Arc::new(AtomicU64::new(0)),
2294            ConflictBehavior::DoUpdateIfNotNull,
2295        )
2296        .await
2297        .boxed()
2298        .execute();
2299        materialize_executor.next().await.transpose().unwrap();
2300
2301        materialize_executor.next().await.transpose().unwrap();
2302
2303        // First stream chunk. We check the existence of (3) -> (3,6)
2304        match materialize_executor.next().await.transpose().unwrap() {
2305            Some(Message::Barrier(_)) => {
2306                let row = table
2307                    .get_row(
2308                        &OwnedRow::new(vec![Some(8_i32.into())]),
2309                        HummockReadEpoch::NoWait(u64::MAX),
2310                    )
2311                    .await
2312                    .unwrap();
2313                assert_eq!(
2314                    row,
2315                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
2316                );
2317
2318                let row = table
2319                    .get_row(
2320                        &OwnedRow::new(vec![Some(2_i32.into())]),
2321                        HummockReadEpoch::NoWait(u64::MAX),
2322                    )
2323                    .await
2324                    .unwrap();
2325                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2326            }
2327            _ => unreachable!(),
2328        }
2329        materialize_executor.next().await.transpose().unwrap();
2330
2331        match materialize_executor.next().await.transpose().unwrap() {
2332            Some(Message::Barrier(_)) => {
2333                let row = table
2334                    .get_row(
2335                        &OwnedRow::new(vec![Some(7_i32.into())]),
2336                        HummockReadEpoch::NoWait(u64::MAX),
2337                    )
2338                    .await
2339                    .unwrap();
2340                assert_eq!(
2341                    row,
2342                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2343                );
2344
2345                // check delete wrong value
2346                let row = table
2347                    .get_row(
2348                        &OwnedRow::new(vec![Some(3_i32.into())]),
2349                        HummockReadEpoch::NoWait(u64::MAX),
2350                    )
2351                    .await
2352                    .unwrap();
2353                assert_eq!(row, None);
2354
2355                // check delete wrong pk
2356                let row = table
2357                    .get_row(
2358                        &OwnedRow::new(vec![Some(5_i32.into())]),
2359                        HummockReadEpoch::NoWait(u64::MAX),
2360                    )
2361                    .await
2362                    .unwrap();
2363                assert_eq!(row, None);
2364            }
2365            _ => unreachable!(),
2366        }
2367
2368        materialize_executor.next().await.transpose().unwrap();
2369        // materialize_executor.next().await.transpose().unwrap();
2370        // Second stream chunk. We check the existence of (7) -> (7,8)
2371        match materialize_executor.next().await.transpose().unwrap() {
2372            Some(Message::Barrier(_)) => {
2373                let row = table
2374                    .get_row(
2375                        &OwnedRow::new(vec![Some(7_i32.into())]),
2376                        HummockReadEpoch::NoWait(u64::MAX),
2377                    )
2378                    .await
2379                    .unwrap();
2380                assert_eq!(
2381                    row,
2382                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
2383                );
2384
2385                // check update wrong value
2386                let row = table
2387                    .get_row(
2388                        &OwnedRow::new(vec![Some(2_i32.into())]),
2389                        HummockReadEpoch::NoWait(u64::MAX),
2390                    )
2391                    .await
2392                    .unwrap();
2393                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
2394
2395                // check update wrong pk, should become insert
2396                let row = table
2397                    .get_row(
2398                        &OwnedRow::new(vec![Some(9_i32.into())]),
2399                        HummockReadEpoch::NoWait(u64::MAX),
2400                    )
2401                    .await
2402                    .unwrap();
2403                assert_eq!(
2404                    row,
2405                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
2406                );
2407            }
2408            _ => unreachable!(),
2409        }
2410    }
2411
2412    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
2413        const KN: u32 = 4;
2414        const SEED: u64 = 998244353;
2415        let mut ret = vec![];
2416        let mut builder =
2417            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
2418        let mut rng = SmallRng::seed_from_u64(SEED);
2419
2420        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
2421            let len = c.data_chunk().capacity();
2422            let mut c = StreamChunkMut::from(c);
2423            for i in 0..len {
2424                c.set_vis(i, rng.random_bool(0.5));
2425            }
2426            c.into()
2427        };
2428        for _ in 0..row_number {
2429            let k = (rng.next_u32() % KN) as i32;
2430            let v = rng.next_u32() as i32;
2431            let op = if rng.random_bool(0.5) {
2432                Op::Insert
2433            } else {
2434                Op::Delete
2435            };
2436            if let Some(c) =
2437                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
2438            {
2439                ret.push(random_vis(c, &mut rng));
2440            }
2441        }
2442        if let Some(c) = builder.take() {
2443            ret.push(random_vis(c, &mut rng));
2444        }
2445        ret
2446    }
2447
2448    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
2449        const N: usize = 100000;
2450
2451        // Prepare storage and memtable.
2452        let memory_state_store = MemoryStateStore::new();
2453        let table_id = TableId::new(1);
2454        // Two columns of int32 type, the first column is PK.
2455        let schema = Schema::new(vec![
2456            Field::unnamed(DataType::Int32),
2457            Field::unnamed(DataType::Int32),
2458        ]);
2459        let column_ids = vec![0.into(), 1.into()];
2460
2461        let chunks = gen_fuzz_data(N, 128);
2462        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
2463            .chain(chunks.into_iter().map(Message::Chunk))
2464            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
2465                test_epoch(2),
2466            ))))
2467            .collect();
2468        // Prepare stream executors.
2469        let source =
2470            MockSource::with_messages(messages).into_executor(schema.clone(), StreamKey::new());
2471
2472        let mut materialize_executor = MaterializeExecutor::for_test(
2473            source,
2474            memory_state_store.clone(),
2475            table_id,
2476            vec![ColumnOrder::new(0, OrderType::ascending())],
2477            column_ids,
2478            Arc::new(AtomicU64::new(0)),
2479            conflict_behavior,
2480        )
2481        .await
2482        .boxed()
2483        .execute();
2484        materialize_executor.expect_barrier().await;
2485
2486        let order_types = vec![OrderType::ascending()];
2487        let column_descs = vec![
2488            ColumnDesc::unnamed(0.into(), DataType::Int32),
2489            ColumnDesc::unnamed(1.into(), DataType::Int32),
2490        ];
2491        let pk_indices = vec![0];
2492
2493        let mut table = StateTable::from_table_catalog(
2494            &crate::common::table::test_utils::gen_pbtable(
2495                TableId::from(1002),
2496                column_descs.clone(),
2497                order_types,
2498                pk_indices,
2499                0,
2500            ),
2501            memory_state_store.clone(),
2502            None,
2503        )
2504        .await;
2505
2506        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2507            // check with state table's memtable
2508            table.write_chunk(c);
2509        }
2510    }
2511
2512    #[tokio::test]
2513    async fn fuzz_test_stream_consistent_upsert() {
2514        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2515    }
2516
2517    #[tokio::test]
2518    async fn fuzz_test_stream_consistent_ignore() {
2519        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2520    }
2521}