risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27    ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
47pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48    input: Executor,
49
50    schema: Schema,
51
52    state_table: StateTableInner<S, SD>,
53
54    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
55    arrange_key_indices: Vec<usize>,
56
57    actor_context: ActorContextRef,
58
59    materialize_cache: MaterializeCache<SD>,
60
61    conflict_behavior: ConflictBehavior,
62
63    version_column_index: Option<u32>,
64
65    may_have_downstream: bool,
66
67    depended_subscription_ids: HashSet<u32>,
68
69    metrics: MaterializeMetrics,
70
71    /// No data will be written to hummock table. This Materialize is just a dummy node.
72    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
73    is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77    conflict_behavior: ConflictBehavior,
78    may_have_downstream: bool,
79    depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81    if !depended_subscriptions.is_empty() {
82        StateTableOpConsistencyLevel::LogStoreEnabled
83    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84        // Table with overwrite conflict behavior could disable conflict check
85        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
86        StateTableOpConsistencyLevel::Inconsistent
87    } else {
88        StateTableOpConsistencyLevel::ConsistentOldValue
89    }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
94    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
95    /// should be `None`.
96    #[allow(clippy::too_many_arguments)]
97    pub async fn new(
98        input: Executor,
99        schema: Schema,
100        store: S,
101        arrange_key: Vec<ColumnOrder>,
102        actor_context: ActorContextRef,
103        vnodes: Option<Arc<Bitmap>>,
104        table_catalog: &Table,
105        watermark_epoch: AtomicU64Ref,
106        conflict_behavior: ConflictBehavior,
107        version_column_index: Option<u32>,
108        metrics: Arc<StreamingMetrics>,
109    ) -> Self {
110        let table_columns: Vec<ColumnDesc> = table_catalog
111            .columns
112            .iter()
113            .map(|col| col.column_desc.as_ref().unwrap().into())
114            .collect();
115
116        let row_serde: BasicSerde = BasicSerde::new(
117            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118            Arc::from(table_columns.into_boxed_slice()),
119        );
120
121        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122        let may_have_downstream = actor_context.initial_dispatch_num != 0;
123        let depended_subscription_ids = actor_context
124            .related_subscriptions
125            .get(&TableId::new(table_catalog.id))
126            .cloned()
127            .unwrap_or_default();
128        let op_consistency_level = get_op_consistency_level(
129            conflict_behavior,
130            may_have_downstream,
131            &depended_subscription_ids,
132        );
133        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
134        let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135            table_catalog,
136            store,
137            vnodes,
138            op_consistency_level,
139        )
140        .await;
141
142        let mv_metrics = metrics.new_materialize_metrics(
143            TableId::new(table_catalog.id),
144            actor_context.id,
145            actor_context.fragment_id,
146        );
147
148        let metrics_info =
149            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151        let is_dummy_table =
152            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154        Self {
155            input,
156            schema,
157            state_table,
158            arrange_key_indices,
159            actor_context,
160            materialize_cache: MaterializeCache::new(
161                watermark_epoch,
162                metrics_info,
163                row_serde,
164                version_column_index,
165            ),
166            conflict_behavior,
167            version_column_index,
168            is_dummy_table,
169            may_have_downstream,
170            depended_subscription_ids,
171            metrics: mv_metrics,
172        }
173    }
174
175    #[try_stream(ok = Message, error = StreamExecutorError)]
176    async fn execute_inner(mut self) {
177        let mv_table_id = TableId::new(self.state_table.table_id());
178
179        let data_types = self.schema.data_types();
180        let mut input = self.input.execute();
181
182        let barrier = expect_first_barrier(&mut input).await?;
183        let first_epoch = barrier.epoch;
184        // The first barrier message should be propagated.
185        yield Message::Barrier(barrier);
186        self.state_table.init_epoch(first_epoch).await?;
187
188        #[for_await]
189        for msg in input {
190            let msg = msg?;
191            self.materialize_cache.evict();
192
193            let msg = match msg {
194                Message::Watermark(w) => Message::Watermark(w),
195                Message::Chunk(chunk) if self.is_dummy_table => {
196                    self.metrics
197                        .materialize_input_row_count
198                        .inc_by(chunk.cardinality() as u64);
199                    Message::Chunk(chunk)
200                }
201                Message::Chunk(chunk) => {
202                    self.metrics
203                        .materialize_input_row_count
204                        .inc_by(chunk.cardinality() as u64);
205
206                    // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
207                    // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
208                    // and the conflict behavior is overwrite.
209                    let do_not_handle_conflict = !self.state_table.is_consistent_op()
210                        && self.version_column_index.is_none()
211                        && self.conflict_behavior == ConflictBehavior::Overwrite;
212                    match self.conflict_behavior {
213                        checked_conflict_behaviors!() if !do_not_handle_conflict => {
214                            if chunk.cardinality() == 0 {
215                                // empty chunk
216                                continue;
217                            }
218                            let (data_chunk, ops) = chunk.into_parts();
219
220                            if self.state_table.value_indices().is_some() {
221                                // TODO(st1page): when materialize partial columns(), we should
222                                // construct some columns in the pk
223                                panic!(
224                                    "materialize executor with data check can not handle only materialize partial columns"
225                                )
226                            };
227                            let values = data_chunk.serialize();
228
229                            let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231                            let pks = {
232                                let mut pks = vec![vec![]; data_chunk.capacity()];
233                                key_chunk
234                                    .rows_with_holes()
235                                    .zip_eq_fast(pks.iter_mut())
236                                    .for_each(|(r, vnode_and_pk)| {
237                                        if let Some(r) = r {
238                                            self.state_table.pk_serde().serialize(r, vnode_and_pk);
239                                        }
240                                    });
241                                pks
242                            };
243                            let (_, vis) = key_chunk.into_parts();
244                            let row_ops = ops
245                                .iter()
246                                .zip_eq_debug(pks.into_iter())
247                                .zip_eq_debug(values.into_iter())
248                                .zip_eq_debug(vis.iter())
249                                .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250                                .collect_vec();
251
252                            let change_buffer = self
253                                .materialize_cache
254                                .handle(
255                                    row_ops,
256                                    &self.state_table,
257                                    self.conflict_behavior,
258                                    &self.metrics,
259                                )
260                                .await?;
261
262                            match generate_output(change_buffer, data_types.clone())? {
263                                Some(output_chunk) => {
264                                    self.state_table.write_chunk(output_chunk.clone());
265                                    self.state_table.try_flush().await?;
266                                    Message::Chunk(output_chunk)
267                                }
268                                None => continue,
269                            }
270                        }
271                        ConflictBehavior::IgnoreConflict => unreachable!(),
272                        ConflictBehavior::NoCheck
273                        | ConflictBehavior::Overwrite
274                        | ConflictBehavior::DoUpdateIfNotNull => {
275                            self.state_table.write_chunk(chunk.clone());
276                            self.state_table.try_flush().await?;
277                            Message::Chunk(chunk)
278                        } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(),
279                    }
280                }
281                Message::Barrier(b) => {
282                    // If a downstream mv depends on the current table, we need to do conflict check again.
283                    if !self.may_have_downstream
284                        && b.has_more_downstream_fragments(self.actor_context.id)
285                    {
286                        self.may_have_downstream = true;
287                    }
288                    Self::may_update_depended_subscriptions(
289                        &mut self.depended_subscription_ids,
290                        &b,
291                        mv_table_id,
292                    );
293                    let op_consistency_level = get_op_consistency_level(
294                        self.conflict_behavior,
295                        self.may_have_downstream,
296                        &self.depended_subscription_ids,
297                    );
298                    let post_commit = self
299                        .state_table
300                        .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301                        .await?;
302                    if !post_commit.inner().is_consistent_op() {
303                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304                    }
305
306                    let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307                    let b_epoch = b.epoch;
308                    yield Message::Barrier(b);
309
310                    // Update the vnode bitmap for the state table if asked.
311                    if let Some((_, cache_may_stale)) =
312                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
313                    {
314                        if cache_may_stale {
315                            self.materialize_cache.lru_cache.clear();
316                        }
317                    }
318
319                    self.metrics
320                        .materialize_current_epoch
321                        .set(b_epoch.curr as i64);
322
323                    continue;
324                }
325            };
326            yield msg;
327        }
328    }
329
330    /// return true when changed
331    fn may_update_depended_subscriptions(
332        depended_subscriptions: &mut HashSet<u32>,
333        barrier: &Barrier,
334        mv_table_id: TableId,
335    ) {
336        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
337            if !depended_subscriptions.insert(subscriber_id) {
338                warn!(
339                    ?depended_subscriptions,
340                    ?mv_table_id,
341                    subscriber_id,
342                    "subscription id already exists"
343                );
344            }
345        }
346
347        if let Some(Mutation::DropSubscriptions {
348            subscriptions_to_drop,
349        }) = barrier.mutation.as_deref()
350        {
351            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
352                if *upstream_mv_table_id == mv_table_id
353                    && !depended_subscriptions.remove(subscriber_id)
354                {
355                    warn!(
356                        ?depended_subscriptions,
357                        ?mv_table_id,
358                        subscriber_id,
359                        "drop non existing subscriber_id id"
360                    );
361                }
362            }
363        }
364    }
365}
366
367impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
368    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
369    #[allow(clippy::too_many_arguments)]
370    pub async fn for_test(
371        input: Executor,
372        store: S,
373        table_id: TableId,
374        keys: Vec<ColumnOrder>,
375        column_ids: Vec<ColumnId>,
376        watermark_epoch: AtomicU64Ref,
377        conflict_behavior: ConflictBehavior,
378    ) -> Self {
379        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
380        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
381        let schema = input.schema().clone();
382        let columns: Vec<ColumnDesc> = column_ids
383            .into_iter()
384            .zip_eq_fast(schema.fields.iter())
385            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
386            .collect_vec();
387
388        let row_serde = BasicSerde::new(
389            Arc::from((0..columns.len()).collect_vec()),
390            Arc::from(columns.clone().into_boxed_slice()),
391        );
392        let state_table = StateTableInner::from_table_catalog(
393            &gen_pbtable(
394                table_id,
395                columns,
396                arrange_order_types,
397                arrange_columns.clone(),
398                0,
399            ),
400            store,
401            None,
402        )
403        .await;
404
405        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
406
407        Self {
408            input,
409            schema,
410            state_table,
411            arrange_key_indices: arrange_columns.clone(),
412            actor_context: ActorContext::for_test(0),
413            materialize_cache: MaterializeCache::new(
414                watermark_epoch,
415                MetricsInfo::for_test(),
416                row_serde,
417                None,
418            ),
419            conflict_behavior,
420            version_column_index: None,
421            is_dummy_table: false,
422            may_have_downstream: true,
423            depended_subscription_ids: HashSet::new(),
424            metrics,
425        }
426    }
427}
428
429/// Construct output `StreamChunk` from given buffer.
430fn generate_output(
431    change_buffer: ChangeBuffer,
432    data_types: Vec<DataType>,
433) -> StreamExecutorResult<Option<StreamChunk>> {
434    // construct output chunk
435    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
436    let mut new_ops: Vec<Op> = vec![];
437    let mut new_rows: Vec<Bytes> = vec![];
438    let row_deserializer = RowDeserializer::new(data_types.clone());
439    for (_, row_op) in change_buffer.into_parts() {
440        match row_op {
441            KeyOp::Insert(value) => {
442                new_ops.push(Op::Insert);
443                new_rows.push(value);
444            }
445            KeyOp::Delete(old_value) => {
446                new_ops.push(Op::Delete);
447                new_rows.push(old_value);
448            }
449            KeyOp::Update((old_value, new_value)) => {
450                // if old_value == new_value, we don't need to emit updates to downstream.
451                if old_value != new_value {
452                    new_ops.push(Op::UpdateDelete);
453                    new_ops.push(Op::UpdateInsert);
454                    new_rows.push(old_value);
455                    new_rows.push(new_value);
456                }
457            }
458        }
459    }
460    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
461
462    for row_bytes in new_rows {
463        let res =
464            data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
465        debug_assert!(res.is_none());
466    }
467
468    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
469        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
470        Ok(Some(new_stream_chunk))
471    } else {
472        Ok(None)
473    }
474}
475
476/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
477/// TODO(rc): merge with `TopNStaging`.
478struct ChangeBuffer {
479    buffer: HashMap<Vec<u8>, KeyOp>,
480}
481
482impl ChangeBuffer {
483    fn new() -> Self {
484        Self {
485            buffer: HashMap::new(),
486        }
487    }
488
489    fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
490        let entry = self.buffer.entry(pk);
491        match entry {
492            Entry::Vacant(e) => {
493                e.insert(KeyOp::Insert(value));
494            }
495            Entry::Occupied(mut e) => {
496                if let KeyOp::Delete(old_value) = e.get_mut() {
497                    let old_val = std::mem::take(old_value);
498                    e.insert(KeyOp::Update((old_val, value)));
499                } else {
500                    unreachable!();
501                }
502            }
503        }
504    }
505
506    fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
507        let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
508        match entry {
509            Entry::Vacant(e) => {
510                e.insert(KeyOp::Delete(old_value));
511            }
512            Entry::Occupied(mut e) => match e.get_mut() {
513                KeyOp::Insert(_) => {
514                    e.remove();
515                }
516                KeyOp::Update((prev, _curr)) => {
517                    let prev = std::mem::take(prev);
518                    e.insert(KeyOp::Delete(prev));
519                }
520                KeyOp::Delete(_) => {
521                    unreachable!();
522                }
523            },
524        }
525    }
526
527    fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
528        let entry = self.buffer.entry(pk);
529        match entry {
530            Entry::Vacant(e) => {
531                e.insert(KeyOp::Update((old_value, new_value)));
532            }
533            Entry::Occupied(mut e) => match e.get_mut() {
534                KeyOp::Insert(_) => {
535                    e.insert(KeyOp::Insert(new_value));
536                }
537                KeyOp::Update((_prev, curr)) => {
538                    *curr = new_value;
539                }
540                KeyOp::Delete(_) => {
541                    unreachable!()
542                }
543            },
544        }
545    }
546
547    fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
548        self.buffer
549    }
550}
551impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
552    fn execute(self: Box<Self>) -> BoxedMessageStream {
553        self.execute_inner().boxed()
554    }
555}
556
557impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
558    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
559        f.debug_struct("MaterializeExecutor")
560            .field("arrange_key_indices", &self.arrange_key_indices)
561            .finish()
562    }
563}
564
565/// A cache for materialize executors.
566struct MaterializeCache<SD> {
567    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
568    row_serde: BasicSerde,
569    version_column_index: Option<u32>,
570    _serde: PhantomData<SD>,
571}
572
573type CacheValue = Option<CompactedRow>;
574
575impl<SD: ValueRowSerde> MaterializeCache<SD> {
576    fn new(
577        watermark_sequence: AtomicU64Ref,
578        metrics_info: MetricsInfo,
579        row_serde: BasicSerde,
580        version_column_index: Option<u32>,
581    ) -> Self {
582        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
583            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
584        Self {
585            lru_cache,
586            row_serde,
587            version_column_index,
588            _serde: PhantomData,
589        }
590    }
591
592    async fn handle<S: StateStore>(
593        &mut self,
594        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
595        table: &StateTableInner<S, SD>,
596        conflict_behavior: ConflictBehavior,
597        metrics: &MaterializeMetrics,
598    ) -> StreamExecutorResult<ChangeBuffer> {
599        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
600
601        let key_set: HashSet<Box<[u8]>> = row_ops
602            .iter()
603            .map(|(_, k, _)| k.as_slice().into())
604            .collect();
605
606        // Populate the LRU cache with the keys in input chunk.
607        // For new keys, row values are set to None.
608        self.fetch_keys(
609            key_set.iter().map(|v| v.deref()),
610            table,
611            conflict_behavior,
612            metrics,
613        )
614        .await?;
615
616        let mut change_buffer = ChangeBuffer::new();
617        let row_serde = self.row_serde.clone();
618        let version_column_index = self.version_column_index;
619        for (op, key, row) in row_ops {
620            match op {
621                Op::Insert | Op::UpdateInsert => {
622                    let Some(old_row) = self.get_expected(&key) else {
623                        // not exists before, meaning no conflict, simply insert
624                        change_buffer.insert(key.clone(), row.clone());
625                        self.lru_cache.put(key, Some(CompactedRow { row }));
626                        continue;
627                    };
628
629                    // now conflict happens, handle it according to the specified behavior
630                    match conflict_behavior {
631                        ConflictBehavior::Overwrite => {
632                            let need_overwrite = if let Some(idx) = version_column_index {
633                                let old_row_deserialized =
634                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
635                                let new_row_deserialized =
636                                    row_serde.deserializer.deserialize(row.clone())?;
637
638                                version_is_newer_or_equal(
639                                    old_row_deserialized.index(idx as usize),
640                                    new_row_deserialized.index(idx as usize),
641                                )
642                            } else {
643                                // no version column specified, just overwrite
644                                true
645                            };
646                            if need_overwrite {
647                                change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
648                                self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
649                            };
650                        }
651                        ConflictBehavior::IgnoreConflict => {
652                            // ignore conflict, do nothing
653                        }
654                        ConflictBehavior::DoUpdateIfNotNull => {
655                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
656                            // TODO(wcy-fdu): find a way to output the resulting new row directly to the downstream chunk, thus avoiding an additional deserialization step.
657
658                            let old_row_deserialized =
659                                row_serde.deserializer.deserialize(old_row.row.clone())?;
660                            let new_row_deserialized =
661                                row_serde.deserializer.deserialize(row.clone())?;
662                            let need_overwrite = if let Some(idx) = version_column_index {
663                                version_is_newer_or_equal(
664                                    old_row_deserialized.index(idx as usize),
665                                    new_row_deserialized.index(idx as usize),
666                                )
667                            } else {
668                                true
669                            };
670
671                            if need_overwrite {
672                                let mut row_deserialized_vec =
673                                    old_row_deserialized.into_inner().into_vec();
674                                replace_if_not_null(
675                                    &mut row_deserialized_vec,
676                                    new_row_deserialized,
677                                );
678                                let updated_row = OwnedRow::new(row_deserialized_vec);
679                                let updated_row_bytes =
680                                    Bytes::from(row_serde.serializer.serialize(updated_row));
681
682                                change_buffer.update(
683                                    key.clone(),
684                                    old_row.row.clone(),
685                                    updated_row_bytes.clone(),
686                                );
687                                self.lru_cache.put(
688                                    key.clone(),
689                                    Some(CompactedRow {
690                                        row: updated_row_bytes,
691                                    }),
692                                );
693                            }
694                        }
695                        _ => unreachable!(),
696                    };
697                }
698
699                Op::Delete | Op::UpdateDelete => {
700                    match conflict_behavior {
701                        checked_conflict_behaviors!() => {
702                            if let Some(old_row) = self.get_expected(&key) {
703                                change_buffer.delete(key.clone(), old_row.row.clone());
704                                // put a None into the cache to represent deletion
705                                self.lru_cache.put(key, None);
706                            } else {
707                                // delete a non-existent value
708                                // this is allowed in the case of mview conflict, so ignore
709                            };
710                        }
711                        _ => unreachable!(),
712                    };
713                }
714            }
715        }
716        Ok(change_buffer)
717    }
718
719    async fn fetch_keys<'a, S: StateStore>(
720        &mut self,
721        keys: impl Iterator<Item = &'a [u8]>,
722        table: &StateTableInner<S, SD>,
723        conflict_behavior: ConflictBehavior,
724        metrics: &MaterializeMetrics,
725    ) -> StreamExecutorResult<()> {
726        let mut futures = vec![];
727        for key in keys {
728            metrics.materialize_cache_total_count.inc();
729
730            if self.lru_cache.contains(key) {
731                metrics.materialize_cache_hit_count.inc();
732                continue;
733            }
734            futures.push(async {
735                let key_row = table.pk_serde().deserialize(key).unwrap();
736                let row = table.get_row(key_row).await?.map(CompactedRow::from);
737                StreamExecutorResult::Ok((key.to_vec(), row))
738            });
739        }
740
741        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
742        while let Some(result) = buffered.next().await {
743            let (key, row) = result?;
744            // for keys that are not in the table, `value` is None
745            match conflict_behavior {
746                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
747                _ => unreachable!(),
748            };
749        }
750
751        Ok(())
752    }
753
754    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
755        self.lru_cache.get(key).unwrap_or_else(|| {
756            panic!(
757                "the key {:?} has not been fetched in the materialize executor's cache ",
758                key
759            )
760        })
761    }
762
763    fn evict(&mut self) {
764        self.lru_cache.evict()
765    }
766}
767
768/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
769/// column value in the replacement row is not null.
770///
771/// # Example
772///
773/// ```ignore
774/// let mut row = vec![Some(1), None, Some(3)];
775/// let replacement = vec![Some(10), Some(20), None];
776/// replace_if_not_null(&mut row, replacement);
777/// ```
778///
779/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
780fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
781    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
782        if let Some(new_value) = new_col {
783            *old_col = Some(new_value);
784        }
785    }
786}
787
788/// Determines whether pk conflict handling should update an existing row with newly-received value,
789/// according to the value of version column of the new and old rows.
790fn version_is_newer_or_equal(
791    old_version: &Option<ScalarImpl>,
792    new_version: &Option<ScalarImpl>,
793) -> bool {
794    cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
795}
796
797#[cfg(test)]
798mod tests {
799
800    use std::iter;
801    use std::sync::atomic::AtomicU64;
802
803    use rand::rngs::SmallRng;
804    use rand::{Rng, RngCore, SeedableRng};
805    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
806    use risingwave_common::catalog::Field;
807    use risingwave_common::util::epoch::test_epoch;
808    use risingwave_common::util::sort_util::OrderType;
809    use risingwave_hummock_sdk::HummockReadEpoch;
810    use risingwave_storage::memory::MemoryStateStore;
811    use risingwave_storage::table::batch_table::BatchTable;
812
813    use super::*;
814    use crate::executor::test_utils::*;
815
816    #[tokio::test]
817    async fn test_materialize_executor() {
818        // Prepare storage and memtable.
819        let memory_state_store = MemoryStateStore::new();
820        let table_id = TableId::new(1);
821        // Two columns of int32 type, the first column is PK.
822        let schema = Schema::new(vec![
823            Field::unnamed(DataType::Int32),
824            Field::unnamed(DataType::Int32),
825        ]);
826        let column_ids = vec![0.into(), 1.into()];
827
828        // Prepare source chunks.
829        let chunk1 = StreamChunk::from_pretty(
830            " i i
831            + 1 4
832            + 2 5
833            + 3 6",
834        );
835        let chunk2 = StreamChunk::from_pretty(
836            " i i
837            + 7 8
838            - 3 6",
839        );
840
841        // Prepare stream executors.
842        let source = MockSource::with_messages(vec![
843            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
844            Message::Chunk(chunk1),
845            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
846            Message::Chunk(chunk2),
847            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
848        ])
849        .into_executor(schema.clone(), PkIndices::new());
850
851        let order_types = vec![OrderType::ascending()];
852        let column_descs = vec![
853            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
854            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
855        ];
856
857        let table = BatchTable::for_test(
858            memory_state_store.clone(),
859            table_id,
860            column_descs,
861            order_types,
862            vec![0],
863            vec![0, 1],
864        );
865
866        let mut materialize_executor = MaterializeExecutor::for_test(
867            source,
868            memory_state_store,
869            table_id,
870            vec![ColumnOrder::new(0, OrderType::ascending())],
871            column_ids,
872            Arc::new(AtomicU64::new(0)),
873            ConflictBehavior::NoCheck,
874        )
875        .await
876        .boxed()
877        .execute();
878        materialize_executor.next().await.transpose().unwrap();
879
880        materialize_executor.next().await.transpose().unwrap();
881
882        // First stream chunk. We check the existence of (3) -> (3,6)
883        match materialize_executor.next().await.transpose().unwrap() {
884            Some(Message::Barrier(_)) => {
885                let row = table
886                    .get_row(
887                        &OwnedRow::new(vec![Some(3_i32.into())]),
888                        HummockReadEpoch::NoWait(u64::MAX),
889                    )
890                    .await
891                    .unwrap();
892                assert_eq!(
893                    row,
894                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
895                );
896            }
897            _ => unreachable!(),
898        }
899        materialize_executor.next().await.transpose().unwrap();
900        // Second stream chunk. We check the existence of (7) -> (7,8)
901        match materialize_executor.next().await.transpose().unwrap() {
902            Some(Message::Barrier(_)) => {
903                let row = table
904                    .get_row(
905                        &OwnedRow::new(vec![Some(7_i32.into())]),
906                        HummockReadEpoch::NoWait(u64::MAX),
907                    )
908                    .await
909                    .unwrap();
910                assert_eq!(
911                    row,
912                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
913                );
914            }
915            _ => unreachable!(),
916        }
917    }
918
919    // https://github.com/risingwavelabs/risingwave/issues/13346
920    #[tokio::test]
921    async fn test_upsert_stream() {
922        // Prepare storage and memtable.
923        let memory_state_store = MemoryStateStore::new();
924        let table_id = TableId::new(1);
925        // Two columns of int32 type, the first column is PK.
926        let schema = Schema::new(vec![
927            Field::unnamed(DataType::Int32),
928            Field::unnamed(DataType::Int32),
929        ]);
930        let column_ids = vec![0.into(), 1.into()];
931
932        // test double insert one pk, the latter needs to override the former.
933        let chunk1 = StreamChunk::from_pretty(
934            " i i
935            + 1 1",
936        );
937
938        let chunk2 = StreamChunk::from_pretty(
939            " i i
940            + 1 2
941            - 1 2",
942        );
943
944        // Prepare stream executors.
945        let source = MockSource::with_messages(vec![
946            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
947            Message::Chunk(chunk1),
948            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
949            Message::Chunk(chunk2),
950            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
951        ])
952        .into_executor(schema.clone(), PkIndices::new());
953
954        let order_types = vec![OrderType::ascending()];
955        let column_descs = vec![
956            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
957            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
958        ];
959
960        let table = BatchTable::for_test(
961            memory_state_store.clone(),
962            table_id,
963            column_descs,
964            order_types,
965            vec![0],
966            vec![0, 1],
967        );
968
969        let mut materialize_executor = MaterializeExecutor::for_test(
970            source,
971            memory_state_store,
972            table_id,
973            vec![ColumnOrder::new(0, OrderType::ascending())],
974            column_ids,
975            Arc::new(AtomicU64::new(0)),
976            ConflictBehavior::Overwrite,
977        )
978        .await
979        .boxed()
980        .execute();
981        materialize_executor.next().await.transpose().unwrap();
982
983        materialize_executor.next().await.transpose().unwrap();
984        materialize_executor.next().await.transpose().unwrap();
985        materialize_executor.next().await.transpose().unwrap();
986
987        match materialize_executor.next().await.transpose().unwrap() {
988            Some(Message::Barrier(_)) => {
989                let row = table
990                    .get_row(
991                        &OwnedRow::new(vec![Some(1_i32.into())]),
992                        HummockReadEpoch::NoWait(u64::MAX),
993                    )
994                    .await
995                    .unwrap();
996                assert!(row.is_none());
997            }
998            _ => unreachable!(),
999        }
1000    }
1001
1002    #[tokio::test]
1003    async fn test_check_insert_conflict() {
1004        // Prepare storage and memtable.
1005        let memory_state_store = MemoryStateStore::new();
1006        let table_id = TableId::new(1);
1007        // Two columns of int32 type, the first column is PK.
1008        let schema = Schema::new(vec![
1009            Field::unnamed(DataType::Int32),
1010            Field::unnamed(DataType::Int32),
1011        ]);
1012        let column_ids = vec![0.into(), 1.into()];
1013
1014        // test double insert one pk, the latter needs to override the former.
1015        let chunk1 = StreamChunk::from_pretty(
1016            " i i
1017            + 1 3
1018            + 1 4
1019            + 2 5
1020            + 3 6",
1021        );
1022
1023        let chunk2 = StreamChunk::from_pretty(
1024            " i i
1025            + 1 3
1026            + 2 6",
1027        );
1028
1029        // test delete wrong value, delete inexistent pk
1030        let chunk3 = StreamChunk::from_pretty(
1031            " i i
1032            + 1 4",
1033        );
1034
1035        // Prepare stream executors.
1036        let source = MockSource::with_messages(vec![
1037            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1038            Message::Chunk(chunk1),
1039            Message::Chunk(chunk2),
1040            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1041            Message::Chunk(chunk3),
1042            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1043        ])
1044        .into_executor(schema.clone(), PkIndices::new());
1045
1046        let order_types = vec![OrderType::ascending()];
1047        let column_descs = vec![
1048            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1049            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1050        ];
1051
1052        let table = BatchTable::for_test(
1053            memory_state_store.clone(),
1054            table_id,
1055            column_descs,
1056            order_types,
1057            vec![0],
1058            vec![0, 1],
1059        );
1060
1061        let mut materialize_executor = MaterializeExecutor::for_test(
1062            source,
1063            memory_state_store,
1064            table_id,
1065            vec![ColumnOrder::new(0, OrderType::ascending())],
1066            column_ids,
1067            Arc::new(AtomicU64::new(0)),
1068            ConflictBehavior::Overwrite,
1069        )
1070        .await
1071        .boxed()
1072        .execute();
1073        materialize_executor.next().await.transpose().unwrap();
1074
1075        materialize_executor.next().await.transpose().unwrap();
1076        materialize_executor.next().await.transpose().unwrap();
1077
1078        // First stream chunk. We check the existence of (3) -> (3,6)
1079        match materialize_executor.next().await.transpose().unwrap() {
1080            Some(Message::Barrier(_)) => {
1081                let row = table
1082                    .get_row(
1083                        &OwnedRow::new(vec![Some(3_i32.into())]),
1084                        HummockReadEpoch::NoWait(u64::MAX),
1085                    )
1086                    .await
1087                    .unwrap();
1088                assert_eq!(
1089                    row,
1090                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1091                );
1092
1093                let row = table
1094                    .get_row(
1095                        &OwnedRow::new(vec![Some(1_i32.into())]),
1096                        HummockReadEpoch::NoWait(u64::MAX),
1097                    )
1098                    .await
1099                    .unwrap();
1100                assert_eq!(
1101                    row,
1102                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1103                );
1104
1105                let row = table
1106                    .get_row(
1107                        &OwnedRow::new(vec![Some(2_i32.into())]),
1108                        HummockReadEpoch::NoWait(u64::MAX),
1109                    )
1110                    .await
1111                    .unwrap();
1112                assert_eq!(
1113                    row,
1114                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1115                );
1116            }
1117            _ => unreachable!(),
1118        }
1119    }
1120
1121    #[tokio::test]
1122    async fn test_delete_and_update_conflict() {
1123        // Prepare storage and memtable.
1124        let memory_state_store = MemoryStateStore::new();
1125        let table_id = TableId::new(1);
1126        // Two columns of int32 type, the first column is PK.
1127        let schema = Schema::new(vec![
1128            Field::unnamed(DataType::Int32),
1129            Field::unnamed(DataType::Int32),
1130        ]);
1131        let column_ids = vec![0.into(), 1.into()];
1132
1133        // test double insert one pk, the latter needs to override the former.
1134        let chunk1 = StreamChunk::from_pretty(
1135            " i i
1136            + 1 4
1137            + 2 5
1138            + 3 6
1139            U- 8 1
1140            U+ 8 2
1141            + 8 3",
1142        );
1143
1144        // test delete wrong value, delete inexistent pk
1145        let chunk2 = StreamChunk::from_pretty(
1146            " i i
1147            + 7 8
1148            - 3 4
1149            - 5 0",
1150        );
1151
1152        // test delete wrong value, delete inexistent pk
1153        let chunk3 = StreamChunk::from_pretty(
1154            " i i
1155            + 1 5
1156            U- 2 4
1157            U+ 2 8
1158            U- 9 0
1159            U+ 9 1",
1160        );
1161
1162        // Prepare stream executors.
1163        let source = MockSource::with_messages(vec![
1164            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1165            Message::Chunk(chunk1),
1166            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1167            Message::Chunk(chunk2),
1168            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1169            Message::Chunk(chunk3),
1170            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1171        ])
1172        .into_executor(schema.clone(), PkIndices::new());
1173
1174        let order_types = vec![OrderType::ascending()];
1175        let column_descs = vec![
1176            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1177            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1178        ];
1179
1180        let table = BatchTable::for_test(
1181            memory_state_store.clone(),
1182            table_id,
1183            column_descs,
1184            order_types,
1185            vec![0],
1186            vec![0, 1],
1187        );
1188
1189        let mut materialize_executor = MaterializeExecutor::for_test(
1190            source,
1191            memory_state_store,
1192            table_id,
1193            vec![ColumnOrder::new(0, OrderType::ascending())],
1194            column_ids,
1195            Arc::new(AtomicU64::new(0)),
1196            ConflictBehavior::Overwrite,
1197        )
1198        .await
1199        .boxed()
1200        .execute();
1201        materialize_executor.next().await.transpose().unwrap();
1202
1203        materialize_executor.next().await.transpose().unwrap();
1204
1205        // First stream chunk. We check the existence of (3) -> (3,6)
1206        match materialize_executor.next().await.transpose().unwrap() {
1207            Some(Message::Barrier(_)) => {
1208                // can read (8, 3), check insert after update
1209                let row = table
1210                    .get_row(
1211                        &OwnedRow::new(vec![Some(8_i32.into())]),
1212                        HummockReadEpoch::NoWait(u64::MAX),
1213                    )
1214                    .await
1215                    .unwrap();
1216                assert_eq!(
1217                    row,
1218                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1219                );
1220            }
1221            _ => unreachable!(),
1222        }
1223        materialize_executor.next().await.transpose().unwrap();
1224
1225        match materialize_executor.next().await.transpose().unwrap() {
1226            Some(Message::Barrier(_)) => {
1227                let row = table
1228                    .get_row(
1229                        &OwnedRow::new(vec![Some(7_i32.into())]),
1230                        HummockReadEpoch::NoWait(u64::MAX),
1231                    )
1232                    .await
1233                    .unwrap();
1234                assert_eq!(
1235                    row,
1236                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1237                );
1238
1239                // check delete wrong value
1240                let row = table
1241                    .get_row(
1242                        &OwnedRow::new(vec![Some(3_i32.into())]),
1243                        HummockReadEpoch::NoWait(u64::MAX),
1244                    )
1245                    .await
1246                    .unwrap();
1247                assert_eq!(row, None);
1248
1249                // check delete wrong pk
1250                let row = table
1251                    .get_row(
1252                        &OwnedRow::new(vec![Some(5_i32.into())]),
1253                        HummockReadEpoch::NoWait(u64::MAX),
1254                    )
1255                    .await
1256                    .unwrap();
1257                assert_eq!(row, None);
1258            }
1259            _ => unreachable!(),
1260        }
1261
1262        materialize_executor.next().await.transpose().unwrap();
1263        // Second stream chunk. We check the existence of (7) -> (7,8)
1264        match materialize_executor.next().await.transpose().unwrap() {
1265            Some(Message::Barrier(_)) => {
1266                let row = table
1267                    .get_row(
1268                        &OwnedRow::new(vec![Some(1_i32.into())]),
1269                        HummockReadEpoch::NoWait(u64::MAX),
1270                    )
1271                    .await
1272                    .unwrap();
1273                assert_eq!(
1274                    row,
1275                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1276                );
1277
1278                // check update wrong value
1279                let row = table
1280                    .get_row(
1281                        &OwnedRow::new(vec![Some(2_i32.into())]),
1282                        HummockReadEpoch::NoWait(u64::MAX),
1283                    )
1284                    .await
1285                    .unwrap();
1286                assert_eq!(
1287                    row,
1288                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1289                );
1290
1291                // check update wrong pk, should become insert
1292                let row = table
1293                    .get_row(
1294                        &OwnedRow::new(vec![Some(9_i32.into())]),
1295                        HummockReadEpoch::NoWait(u64::MAX),
1296                    )
1297                    .await
1298                    .unwrap();
1299                assert_eq!(
1300                    row,
1301                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1302                );
1303            }
1304            _ => unreachable!(),
1305        }
1306    }
1307
1308    #[tokio::test]
1309    async fn test_ignore_insert_conflict() {
1310        // Prepare storage and memtable.
1311        let memory_state_store = MemoryStateStore::new();
1312        let table_id = TableId::new(1);
1313        // Two columns of int32 type, the first column is PK.
1314        let schema = Schema::new(vec![
1315            Field::unnamed(DataType::Int32),
1316            Field::unnamed(DataType::Int32),
1317        ]);
1318        let column_ids = vec![0.into(), 1.into()];
1319
1320        // test double insert one pk, the latter needs to be ignored.
1321        let chunk1 = StreamChunk::from_pretty(
1322            " i i
1323            + 1 3
1324            + 1 4
1325            + 2 5
1326            + 3 6",
1327        );
1328
1329        let chunk2 = StreamChunk::from_pretty(
1330            " i i
1331            + 1 5
1332            + 2 6",
1333        );
1334
1335        // test delete wrong value, delete inexistent pk
1336        let chunk3 = StreamChunk::from_pretty(
1337            " i i
1338            + 1 6",
1339        );
1340
1341        // Prepare stream executors.
1342        let source = MockSource::with_messages(vec![
1343            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1344            Message::Chunk(chunk1),
1345            Message::Chunk(chunk2),
1346            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1347            Message::Chunk(chunk3),
1348            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1349        ])
1350        .into_executor(schema.clone(), PkIndices::new());
1351
1352        let order_types = vec![OrderType::ascending()];
1353        let column_descs = vec![
1354            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1355            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1356        ];
1357
1358        let table = BatchTable::for_test(
1359            memory_state_store.clone(),
1360            table_id,
1361            column_descs,
1362            order_types,
1363            vec![0],
1364            vec![0, 1],
1365        );
1366
1367        let mut materialize_executor = MaterializeExecutor::for_test(
1368            source,
1369            memory_state_store,
1370            table_id,
1371            vec![ColumnOrder::new(0, OrderType::ascending())],
1372            column_ids,
1373            Arc::new(AtomicU64::new(0)),
1374            ConflictBehavior::IgnoreConflict,
1375        )
1376        .await
1377        .boxed()
1378        .execute();
1379        materialize_executor.next().await.transpose().unwrap();
1380
1381        materialize_executor.next().await.transpose().unwrap();
1382        materialize_executor.next().await.transpose().unwrap();
1383
1384        // First stream chunk. We check the existence of (3) -> (3,6)
1385        match materialize_executor.next().await.transpose().unwrap() {
1386            Some(Message::Barrier(_)) => {
1387                let row = table
1388                    .get_row(
1389                        &OwnedRow::new(vec![Some(3_i32.into())]),
1390                        HummockReadEpoch::NoWait(u64::MAX),
1391                    )
1392                    .await
1393                    .unwrap();
1394                assert_eq!(
1395                    row,
1396                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1397                );
1398
1399                let row = table
1400                    .get_row(
1401                        &OwnedRow::new(vec![Some(1_i32.into())]),
1402                        HummockReadEpoch::NoWait(u64::MAX),
1403                    )
1404                    .await
1405                    .unwrap();
1406                assert_eq!(
1407                    row,
1408                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1409                );
1410
1411                let row = table
1412                    .get_row(
1413                        &OwnedRow::new(vec![Some(2_i32.into())]),
1414                        HummockReadEpoch::NoWait(u64::MAX),
1415                    )
1416                    .await
1417                    .unwrap();
1418                assert_eq!(
1419                    row,
1420                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1421                );
1422            }
1423            _ => unreachable!(),
1424        }
1425    }
1426
1427    #[tokio::test]
1428    async fn test_ignore_delete_then_insert() {
1429        // Prepare storage and memtable.
1430        let memory_state_store = MemoryStateStore::new();
1431        let table_id = TableId::new(1);
1432        // Two columns of int32 type, the first column is PK.
1433        let schema = Schema::new(vec![
1434            Field::unnamed(DataType::Int32),
1435            Field::unnamed(DataType::Int32),
1436        ]);
1437        let column_ids = vec![0.into(), 1.into()];
1438
1439        // test insert after delete one pk, the latter insert should succeed.
1440        let chunk1 = StreamChunk::from_pretty(
1441            " i i
1442            + 1 3
1443            - 1 3
1444            + 1 6",
1445        );
1446
1447        // Prepare stream executors.
1448        let source = MockSource::with_messages(vec![
1449            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1450            Message::Chunk(chunk1),
1451            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1452        ])
1453        .into_executor(schema.clone(), PkIndices::new());
1454
1455        let order_types = vec![OrderType::ascending()];
1456        let column_descs = vec![
1457            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1458            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1459        ];
1460
1461        let table = BatchTable::for_test(
1462            memory_state_store.clone(),
1463            table_id,
1464            column_descs,
1465            order_types,
1466            vec![0],
1467            vec![0, 1],
1468        );
1469
1470        let mut materialize_executor = MaterializeExecutor::for_test(
1471            source,
1472            memory_state_store,
1473            table_id,
1474            vec![ColumnOrder::new(0, OrderType::ascending())],
1475            column_ids,
1476            Arc::new(AtomicU64::new(0)),
1477            ConflictBehavior::IgnoreConflict,
1478        )
1479        .await
1480        .boxed()
1481        .execute();
1482        let _msg1 = materialize_executor
1483            .next()
1484            .await
1485            .transpose()
1486            .unwrap()
1487            .unwrap()
1488            .as_barrier()
1489            .unwrap();
1490        let _msg2 = materialize_executor
1491            .next()
1492            .await
1493            .transpose()
1494            .unwrap()
1495            .unwrap()
1496            .as_chunk()
1497            .unwrap();
1498        let _msg3 = materialize_executor
1499            .next()
1500            .await
1501            .transpose()
1502            .unwrap()
1503            .unwrap()
1504            .as_barrier()
1505            .unwrap();
1506
1507        let row = table
1508            .get_row(
1509                &OwnedRow::new(vec![Some(1_i32.into())]),
1510                HummockReadEpoch::NoWait(u64::MAX),
1511            )
1512            .await
1513            .unwrap();
1514        assert_eq!(
1515            row,
1516            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1517        );
1518    }
1519
1520    #[tokio::test]
1521    async fn test_ignore_delete_and_update_conflict() {
1522        // Prepare storage and memtable.
1523        let memory_state_store = MemoryStateStore::new();
1524        let table_id = TableId::new(1);
1525        // Two columns of int32 type, the first column is PK.
1526        let schema = Schema::new(vec![
1527            Field::unnamed(DataType::Int32),
1528            Field::unnamed(DataType::Int32),
1529        ]);
1530        let column_ids = vec![0.into(), 1.into()];
1531
1532        // test double insert one pk, the latter should be ignored.
1533        let chunk1 = StreamChunk::from_pretty(
1534            " i i
1535            + 1 4
1536            + 2 5
1537            + 3 6
1538            U- 8 1
1539            U+ 8 2
1540            + 8 3",
1541        );
1542
1543        // test delete wrong value, delete inexistent pk
1544        let chunk2 = StreamChunk::from_pretty(
1545            " i i
1546            + 7 8
1547            - 3 4
1548            - 5 0",
1549        );
1550
1551        // test delete wrong value, delete inexistent pk
1552        let chunk3 = StreamChunk::from_pretty(
1553            " i i
1554            + 1 5
1555            U- 2 4
1556            U+ 2 8
1557            U- 9 0
1558            U+ 9 1",
1559        );
1560
1561        // Prepare stream executors.
1562        let source = MockSource::with_messages(vec![
1563            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1564            Message::Chunk(chunk1),
1565            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1566            Message::Chunk(chunk2),
1567            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1568            Message::Chunk(chunk3),
1569            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1570        ])
1571        .into_executor(schema.clone(), PkIndices::new());
1572
1573        let order_types = vec![OrderType::ascending()];
1574        let column_descs = vec![
1575            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1576            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1577        ];
1578
1579        let table = BatchTable::for_test(
1580            memory_state_store.clone(),
1581            table_id,
1582            column_descs,
1583            order_types,
1584            vec![0],
1585            vec![0, 1],
1586        );
1587
1588        let mut materialize_executor = MaterializeExecutor::for_test(
1589            source,
1590            memory_state_store,
1591            table_id,
1592            vec![ColumnOrder::new(0, OrderType::ascending())],
1593            column_ids,
1594            Arc::new(AtomicU64::new(0)),
1595            ConflictBehavior::IgnoreConflict,
1596        )
1597        .await
1598        .boxed()
1599        .execute();
1600        materialize_executor.next().await.transpose().unwrap();
1601
1602        materialize_executor.next().await.transpose().unwrap();
1603
1604        // First stream chunk. We check the existence of (3) -> (3,6)
1605        match materialize_executor.next().await.transpose().unwrap() {
1606            Some(Message::Barrier(_)) => {
1607                // can read (8, 2), check insert after update
1608                let row = table
1609                    .get_row(
1610                        &OwnedRow::new(vec![Some(8_i32.into())]),
1611                        HummockReadEpoch::NoWait(u64::MAX),
1612                    )
1613                    .await
1614                    .unwrap();
1615                assert_eq!(
1616                    row,
1617                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1618                );
1619            }
1620            _ => unreachable!(),
1621        }
1622        materialize_executor.next().await.transpose().unwrap();
1623
1624        match materialize_executor.next().await.transpose().unwrap() {
1625            Some(Message::Barrier(_)) => {
1626                let row = table
1627                    .get_row(
1628                        &OwnedRow::new(vec![Some(7_i32.into())]),
1629                        HummockReadEpoch::NoWait(u64::MAX),
1630                    )
1631                    .await
1632                    .unwrap();
1633                assert_eq!(
1634                    row,
1635                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1636                );
1637
1638                // check delete wrong value
1639                let row = table
1640                    .get_row(
1641                        &OwnedRow::new(vec![Some(3_i32.into())]),
1642                        HummockReadEpoch::NoWait(u64::MAX),
1643                    )
1644                    .await
1645                    .unwrap();
1646                assert_eq!(row, None);
1647
1648                // check delete wrong pk
1649                let row = table
1650                    .get_row(
1651                        &OwnedRow::new(vec![Some(5_i32.into())]),
1652                        HummockReadEpoch::NoWait(u64::MAX),
1653                    )
1654                    .await
1655                    .unwrap();
1656                assert_eq!(row, None);
1657            }
1658            _ => unreachable!(),
1659        }
1660
1661        materialize_executor.next().await.transpose().unwrap();
1662        // materialize_executor.next().await.transpose().unwrap();
1663        // Second stream chunk. We check the existence of (7) -> (7,8)
1664        match materialize_executor.next().await.transpose().unwrap() {
1665            Some(Message::Barrier(_)) => {
1666                let row = table
1667                    .get_row(
1668                        &OwnedRow::new(vec![Some(1_i32.into())]),
1669                        HummockReadEpoch::NoWait(u64::MAX),
1670                    )
1671                    .await
1672                    .unwrap();
1673                assert_eq!(
1674                    row,
1675                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1676                );
1677
1678                // check update wrong value
1679                let row = table
1680                    .get_row(
1681                        &OwnedRow::new(vec![Some(2_i32.into())]),
1682                        HummockReadEpoch::NoWait(u64::MAX),
1683                    )
1684                    .await
1685                    .unwrap();
1686                assert_eq!(
1687                    row,
1688                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1689                );
1690
1691                // check update wrong pk, should become insert
1692                let row = table
1693                    .get_row(
1694                        &OwnedRow::new(vec![Some(9_i32.into())]),
1695                        HummockReadEpoch::NoWait(u64::MAX),
1696                    )
1697                    .await
1698                    .unwrap();
1699                assert_eq!(
1700                    row,
1701                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1702                );
1703            }
1704            _ => unreachable!(),
1705        }
1706    }
1707
1708    #[tokio::test]
1709    async fn test_do_update_if_not_null_conflict() {
1710        // Prepare storage and memtable.
1711        let memory_state_store = MemoryStateStore::new();
1712        let table_id = TableId::new(1);
1713        // Two columns of int32 type, the first column is PK.
1714        let schema = Schema::new(vec![
1715            Field::unnamed(DataType::Int32),
1716            Field::unnamed(DataType::Int32),
1717        ]);
1718        let column_ids = vec![0.into(), 1.into()];
1719
1720        // should get (8, 2)
1721        let chunk1 = StreamChunk::from_pretty(
1722            " i i
1723            + 1 4
1724            + 2 .
1725            + 3 6
1726            U- 8 .
1727            U+ 8 2
1728            + 8 .",
1729        );
1730
1731        // should not get (3, x), should not get (5, 0)
1732        let chunk2 = StreamChunk::from_pretty(
1733            " i i
1734            + 7 8
1735            - 3 4
1736            - 5 0",
1737        );
1738
1739        // should get (2, None), (7, 8)
1740        let chunk3 = StreamChunk::from_pretty(
1741            " i i
1742            + 1 5
1743            + 7 .
1744            U- 2 4
1745            U+ 2 .
1746            U- 9 0
1747            U+ 9 1",
1748        );
1749
1750        // Prepare stream executors.
1751        let source = MockSource::with_messages(vec![
1752            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1753            Message::Chunk(chunk1),
1754            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1755            Message::Chunk(chunk2),
1756            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1757            Message::Chunk(chunk3),
1758            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1759        ])
1760        .into_executor(schema.clone(), PkIndices::new());
1761
1762        let order_types = vec![OrderType::ascending()];
1763        let column_descs = vec![
1764            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1765            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1766        ];
1767
1768        let table = BatchTable::for_test(
1769            memory_state_store.clone(),
1770            table_id,
1771            column_descs,
1772            order_types,
1773            vec![0],
1774            vec![0, 1],
1775        );
1776
1777        let mut materialize_executor = MaterializeExecutor::for_test(
1778            source,
1779            memory_state_store,
1780            table_id,
1781            vec![ColumnOrder::new(0, OrderType::ascending())],
1782            column_ids,
1783            Arc::new(AtomicU64::new(0)),
1784            ConflictBehavior::DoUpdateIfNotNull,
1785        )
1786        .await
1787        .boxed()
1788        .execute();
1789        materialize_executor.next().await.transpose().unwrap();
1790
1791        materialize_executor.next().await.transpose().unwrap();
1792
1793        // First stream chunk. We check the existence of (3) -> (3,6)
1794        match materialize_executor.next().await.transpose().unwrap() {
1795            Some(Message::Barrier(_)) => {
1796                let row = table
1797                    .get_row(
1798                        &OwnedRow::new(vec![Some(8_i32.into())]),
1799                        HummockReadEpoch::NoWait(u64::MAX),
1800                    )
1801                    .await
1802                    .unwrap();
1803                assert_eq!(
1804                    row,
1805                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1806                );
1807
1808                let row = table
1809                    .get_row(
1810                        &OwnedRow::new(vec![Some(2_i32.into())]),
1811                        HummockReadEpoch::NoWait(u64::MAX),
1812                    )
1813                    .await
1814                    .unwrap();
1815                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1816            }
1817            _ => unreachable!(),
1818        }
1819        materialize_executor.next().await.transpose().unwrap();
1820
1821        match materialize_executor.next().await.transpose().unwrap() {
1822            Some(Message::Barrier(_)) => {
1823                let row = table
1824                    .get_row(
1825                        &OwnedRow::new(vec![Some(7_i32.into())]),
1826                        HummockReadEpoch::NoWait(u64::MAX),
1827                    )
1828                    .await
1829                    .unwrap();
1830                assert_eq!(
1831                    row,
1832                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1833                );
1834
1835                // check delete wrong value
1836                let row = table
1837                    .get_row(
1838                        &OwnedRow::new(vec![Some(3_i32.into())]),
1839                        HummockReadEpoch::NoWait(u64::MAX),
1840                    )
1841                    .await
1842                    .unwrap();
1843                assert_eq!(row, None);
1844
1845                // check delete wrong pk
1846                let row = table
1847                    .get_row(
1848                        &OwnedRow::new(vec![Some(5_i32.into())]),
1849                        HummockReadEpoch::NoWait(u64::MAX),
1850                    )
1851                    .await
1852                    .unwrap();
1853                assert_eq!(row, None);
1854            }
1855            _ => unreachable!(),
1856        }
1857
1858        materialize_executor.next().await.transpose().unwrap();
1859        // materialize_executor.next().await.transpose().unwrap();
1860        // Second stream chunk. We check the existence of (7) -> (7,8)
1861        match materialize_executor.next().await.transpose().unwrap() {
1862            Some(Message::Barrier(_)) => {
1863                let row = table
1864                    .get_row(
1865                        &OwnedRow::new(vec![Some(7_i32.into())]),
1866                        HummockReadEpoch::NoWait(u64::MAX),
1867                    )
1868                    .await
1869                    .unwrap();
1870                assert_eq!(
1871                    row,
1872                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1873                );
1874
1875                // check update wrong value
1876                let row = table
1877                    .get_row(
1878                        &OwnedRow::new(vec![Some(2_i32.into())]),
1879                        HummockReadEpoch::NoWait(u64::MAX),
1880                    )
1881                    .await
1882                    .unwrap();
1883                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1884
1885                // check update wrong pk, should become insert
1886                let row = table
1887                    .get_row(
1888                        &OwnedRow::new(vec![Some(9_i32.into())]),
1889                        HummockReadEpoch::NoWait(u64::MAX),
1890                    )
1891                    .await
1892                    .unwrap();
1893                assert_eq!(
1894                    row,
1895                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1896                );
1897            }
1898            _ => unreachable!(),
1899        }
1900    }
1901
1902    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1903        const KN: u32 = 4;
1904        const SEED: u64 = 998244353;
1905        let mut ret = vec![];
1906        let mut builder =
1907            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1908        let mut rng = SmallRng::seed_from_u64(SEED);
1909
1910        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1911            let len = c.data_chunk().capacity();
1912            let mut c = StreamChunkMut::from(c);
1913            for i in 0..len {
1914                c.set_vis(i, rng.random_bool(0.5));
1915            }
1916            c.into()
1917        };
1918        for _ in 0..row_number {
1919            let k = (rng.next_u32() % KN) as i32;
1920            let v = rng.next_u32() as i32;
1921            let op = if rng.random_bool(0.5) {
1922                Op::Insert
1923            } else {
1924                Op::Delete
1925            };
1926            if let Some(c) =
1927                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1928            {
1929                ret.push(random_vis(c, &mut rng));
1930            }
1931        }
1932        if let Some(c) = builder.take() {
1933            ret.push(random_vis(c, &mut rng));
1934        }
1935        ret
1936    }
1937
1938    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1939        const N: usize = 100000;
1940
1941        // Prepare storage and memtable.
1942        let memory_state_store = MemoryStateStore::new();
1943        let table_id = TableId::new(1);
1944        // Two columns of int32 type, the first column is PK.
1945        let schema = Schema::new(vec![
1946            Field::unnamed(DataType::Int32),
1947            Field::unnamed(DataType::Int32),
1948        ]);
1949        let column_ids = vec![0.into(), 1.into()];
1950
1951        let chunks = gen_fuzz_data(N, 128);
1952        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1953            .chain(chunks.into_iter().map(Message::Chunk))
1954            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1955                test_epoch(2),
1956            ))))
1957            .collect();
1958        // Prepare stream executors.
1959        let source =
1960            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1961
1962        let mut materialize_executor = MaterializeExecutor::for_test(
1963            source,
1964            memory_state_store.clone(),
1965            table_id,
1966            vec![ColumnOrder::new(0, OrderType::ascending())],
1967            column_ids,
1968            Arc::new(AtomicU64::new(0)),
1969            conflict_behavior,
1970        )
1971        .await
1972        .boxed()
1973        .execute();
1974        materialize_executor.expect_barrier().await;
1975
1976        let order_types = vec![OrderType::ascending()];
1977        let column_descs = vec![
1978            ColumnDesc::unnamed(0.into(), DataType::Int32),
1979            ColumnDesc::unnamed(1.into(), DataType::Int32),
1980        ];
1981        let pk_indices = vec![0];
1982
1983        let mut table = StateTable::from_table_catalog(
1984            &gen_pbtable(
1985                TableId::from(1002),
1986                column_descs.clone(),
1987                order_types,
1988                pk_indices,
1989                0,
1990            ),
1991            memory_state_store.clone(),
1992            None,
1993        )
1994        .await;
1995
1996        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
1997            // check with state table's memtable
1998            table.write_chunk(c);
1999        }
2000    }
2001
2002    #[tokio::test]
2003    async fn fuzz_test_stream_consistent_upsert() {
2004        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2005    }
2006
2007    #[tokio::test]
2008    async fn fuzz_test_stream_consistent_ignore() {
2009        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2010    }
2011}