risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27    ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
47pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48    input: Executor,
49
50    schema: Schema,
51
52    state_table: StateTableInner<S, SD>,
53
54    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
55    arrange_key_indices: Vec<usize>,
56
57    actor_context: ActorContextRef,
58
59    materialize_cache: MaterializeCache<SD>,
60
61    conflict_behavior: ConflictBehavior,
62
63    version_column_index: Option<u32>,
64
65    may_have_downstream: bool,
66
67    depended_subscription_ids: HashSet<u32>,
68
69    metrics: MaterializeMetrics,
70
71    /// No data will be written to hummock table. This Materialize is just a dummy node.
72    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
73    is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77    conflict_behavior: ConflictBehavior,
78    may_have_downstream: bool,
79    depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81    if !depended_subscriptions.is_empty() {
82        StateTableOpConsistencyLevel::LogStoreEnabled
83    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84        // Table with overwrite conflict behavior could disable conflict check
85        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
86        StateTableOpConsistencyLevel::Inconsistent
87    } else {
88        StateTableOpConsistencyLevel::ConsistentOldValue
89    }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
94    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
95    /// should be `None`.
96    #[allow(clippy::too_many_arguments)]
97    pub async fn new(
98        input: Executor,
99        schema: Schema,
100        store: S,
101        arrange_key: Vec<ColumnOrder>,
102        actor_context: ActorContextRef,
103        vnodes: Option<Arc<Bitmap>>,
104        table_catalog: &Table,
105        watermark_epoch: AtomicU64Ref,
106        conflict_behavior: ConflictBehavior,
107        version_column_index: Option<u32>,
108        metrics: Arc<StreamingMetrics>,
109    ) -> Self {
110        let table_columns: Vec<ColumnDesc> = table_catalog
111            .columns
112            .iter()
113            .map(|col| col.column_desc.as_ref().unwrap().into())
114            .collect();
115
116        let row_serde: BasicSerde = BasicSerde::new(
117            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118            Arc::from(table_columns.into_boxed_slice()),
119        );
120
121        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122        let may_have_downstream = actor_context.initial_dispatch_num != 0;
123        let depended_subscription_ids = actor_context
124            .related_subscriptions
125            .get(&TableId::new(table_catalog.id))
126            .cloned()
127            .unwrap_or_default();
128        let op_consistency_level = get_op_consistency_level(
129            conflict_behavior,
130            may_have_downstream,
131            &depended_subscription_ids,
132        );
133        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
134        let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135            table_catalog,
136            store,
137            vnodes,
138            op_consistency_level,
139        )
140        .await;
141
142        let mv_metrics = metrics.new_materialize_metrics(
143            TableId::new(table_catalog.id),
144            actor_context.id,
145            actor_context.fragment_id,
146        );
147
148        let metrics_info =
149            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151        let is_dummy_table =
152            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154        Self {
155            input,
156            schema,
157            state_table,
158            arrange_key_indices,
159            actor_context,
160            materialize_cache: MaterializeCache::new(
161                watermark_epoch,
162                metrics_info,
163                row_serde,
164                version_column_index,
165            ),
166            conflict_behavior,
167            version_column_index,
168            is_dummy_table,
169            may_have_downstream,
170            depended_subscription_ids,
171            metrics: mv_metrics,
172        }
173    }
174
175    #[try_stream(ok = Message, error = StreamExecutorError)]
176    async fn execute_inner(mut self) {
177        let mv_table_id = TableId::new(self.state_table.table_id());
178
179        let data_types = self.schema.data_types();
180        let mut input = self.input.execute();
181
182        let barrier = expect_first_barrier(&mut input).await?;
183        let first_epoch = barrier.epoch;
184        // The first barrier message should be propagated.
185        yield Message::Barrier(barrier);
186        self.state_table.init_epoch(first_epoch).await?;
187
188        #[for_await]
189        for msg in input {
190            let msg = msg?;
191            self.materialize_cache.evict();
192
193            let msg = match msg {
194                Message::Watermark(w) => Message::Watermark(w),
195                Message::Chunk(chunk) if self.is_dummy_table => {
196                    self.metrics
197                        .materialize_input_row_count
198                        .inc_by(chunk.cardinality() as u64);
199                    Message::Chunk(chunk)
200                }
201                Message::Chunk(chunk) => {
202                    self.metrics
203                        .materialize_input_row_count
204                        .inc_by(chunk.cardinality() as u64);
205
206                    // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
207                    // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
208                    // and the conflict behavior is overwrite.
209                    let do_not_handle_conflict = !self.state_table.is_consistent_op()
210                        && self.version_column_index.is_none()
211                        && self.conflict_behavior == ConflictBehavior::Overwrite;
212                    match self.conflict_behavior {
213                        checked_conflict_behaviors!() if !do_not_handle_conflict => {
214                            if chunk.cardinality() == 0 {
215                                // empty chunk
216                                continue;
217                            }
218                            let (data_chunk, ops) = chunk.into_parts();
219
220                            if self.state_table.value_indices().is_some() {
221                                // TODO(st1page): when materialize partial columns(), we should
222                                // construct some columns in the pk
223                                panic!(
224                                    "materialize executor with data check can not handle only materialize partial columns"
225                                )
226                            };
227                            let values = data_chunk.serialize();
228
229                            let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231                            let pks = {
232                                let mut pks = vec![vec![]; data_chunk.capacity()];
233                                key_chunk
234                                    .rows_with_holes()
235                                    .zip_eq_fast(pks.iter_mut())
236                                    .for_each(|(r, vnode_and_pk)| {
237                                        if let Some(r) = r {
238                                            self.state_table.pk_serde().serialize(r, vnode_and_pk);
239                                        }
240                                    });
241                                pks
242                            };
243                            let (_, vis) = key_chunk.into_parts();
244                            let row_ops = ops
245                                .iter()
246                                .zip_eq_debug(pks.into_iter())
247                                .zip_eq_debug(values.into_iter())
248                                .zip_eq_debug(vis.iter())
249                                .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250                                .collect_vec();
251
252                            let change_buffer = self
253                                .materialize_cache
254                                .handle(
255                                    row_ops,
256                                    &self.state_table,
257                                    self.conflict_behavior,
258                                    &self.metrics,
259                                )
260                                .await?;
261
262                            match generate_output(change_buffer, data_types.clone())? {
263                                Some(output_chunk) => {
264                                    self.state_table.write_chunk(output_chunk.clone());
265                                    self.state_table.try_flush().await?;
266                                    Message::Chunk(output_chunk)
267                                }
268                                None => continue,
269                            }
270                        }
271                        ConflictBehavior::IgnoreConflict => unreachable!(),
272                        ConflictBehavior::NoCheck
273                        | ConflictBehavior::Overwrite
274                        | ConflictBehavior::DoUpdateIfNotNull => {
275                            self.state_table.write_chunk(chunk.clone());
276                            self.state_table.try_flush().await?;
277                            Message::Chunk(chunk)
278                        } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(),
279                    }
280                }
281                Message::Barrier(b) => {
282                    // If a downstream mv depends on the current table, we need to do conflict check again.
283                    if !self.may_have_downstream
284                        && b.has_more_downstream_fragments(self.actor_context.id)
285                    {
286                        self.may_have_downstream = true;
287                    }
288                    Self::may_update_depended_subscriptions(
289                        &mut self.depended_subscription_ids,
290                        &b,
291                        mv_table_id,
292                    );
293                    let op_consistency_level = get_op_consistency_level(
294                        self.conflict_behavior,
295                        self.may_have_downstream,
296                        &self.depended_subscription_ids,
297                    );
298                    let post_commit = self
299                        .state_table
300                        .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301                        .await?;
302                    if !post_commit.inner().is_consistent_op() {
303                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304                    }
305
306                    let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307                    let b_epoch = b.epoch;
308                    yield Message::Barrier(b);
309
310                    // Update the vnode bitmap for the state table if asked.
311                    if let Some((_, cache_may_stale)) =
312                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
313                        && cache_may_stale
314                    {
315                        self.materialize_cache.lru_cache.clear();
316                    }
317
318                    self.metrics
319                        .materialize_current_epoch
320                        .set(b_epoch.curr as i64);
321
322                    continue;
323                }
324            };
325            yield msg;
326        }
327    }
328
329    /// return true when changed
330    fn may_update_depended_subscriptions(
331        depended_subscriptions: &mut HashSet<u32>,
332        barrier: &Barrier,
333        mv_table_id: TableId,
334    ) {
335        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
336            if !depended_subscriptions.insert(subscriber_id) {
337                warn!(
338                    ?depended_subscriptions,
339                    ?mv_table_id,
340                    subscriber_id,
341                    "subscription id already exists"
342                );
343            }
344        }
345
346        if let Some(Mutation::DropSubscriptions {
347            subscriptions_to_drop,
348        }) = barrier.mutation.as_deref()
349        {
350            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
351                if *upstream_mv_table_id == mv_table_id
352                    && !depended_subscriptions.remove(subscriber_id)
353                {
354                    warn!(
355                        ?depended_subscriptions,
356                        ?mv_table_id,
357                        subscriber_id,
358                        "drop non existing subscriber_id id"
359                    );
360                }
361            }
362        }
363    }
364}
365
366impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
367    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
368    #[allow(clippy::too_many_arguments)]
369    pub async fn for_test(
370        input: Executor,
371        store: S,
372        table_id: TableId,
373        keys: Vec<ColumnOrder>,
374        column_ids: Vec<ColumnId>,
375        watermark_epoch: AtomicU64Ref,
376        conflict_behavior: ConflictBehavior,
377    ) -> Self {
378        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
379        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
380        let schema = input.schema().clone();
381        let columns: Vec<ColumnDesc> = column_ids
382            .into_iter()
383            .zip_eq_fast(schema.fields.iter())
384            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
385            .collect_vec();
386
387        let row_serde = BasicSerde::new(
388            Arc::from((0..columns.len()).collect_vec()),
389            Arc::from(columns.clone().into_boxed_slice()),
390        );
391        let state_table = StateTableInner::from_table_catalog(
392            &gen_pbtable(
393                table_id,
394                columns,
395                arrange_order_types,
396                arrange_columns.clone(),
397                0,
398            ),
399            store,
400            None,
401        )
402        .await;
403
404        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
405
406        Self {
407            input,
408            schema,
409            state_table,
410            arrange_key_indices: arrange_columns.clone(),
411            actor_context: ActorContext::for_test(0),
412            materialize_cache: MaterializeCache::new(
413                watermark_epoch,
414                MetricsInfo::for_test(),
415                row_serde,
416                None,
417            ),
418            conflict_behavior,
419            version_column_index: None,
420            is_dummy_table: false,
421            may_have_downstream: true,
422            depended_subscription_ids: HashSet::new(),
423            metrics,
424        }
425    }
426}
427
428/// Construct output `StreamChunk` from given buffer.
429fn generate_output(
430    change_buffer: ChangeBuffer,
431    data_types: Vec<DataType>,
432) -> StreamExecutorResult<Option<StreamChunk>> {
433    // construct output chunk
434    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
435    let mut new_ops: Vec<Op> = vec![];
436    let mut new_rows: Vec<Bytes> = vec![];
437    let row_deserializer = RowDeserializer::new(data_types.clone());
438    for (_, row_op) in change_buffer.into_parts() {
439        match row_op {
440            KeyOp::Insert(value) => {
441                new_ops.push(Op::Insert);
442                new_rows.push(value);
443            }
444            KeyOp::Delete(old_value) => {
445                new_ops.push(Op::Delete);
446                new_rows.push(old_value);
447            }
448            KeyOp::Update((old_value, new_value)) => {
449                // if old_value == new_value, we don't need to emit updates to downstream.
450                if old_value != new_value {
451                    new_ops.push(Op::UpdateDelete);
452                    new_ops.push(Op::UpdateInsert);
453                    new_rows.push(old_value);
454                    new_rows.push(new_value);
455                }
456            }
457        }
458    }
459    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
460
461    for row_bytes in new_rows {
462        let res =
463            data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
464        debug_assert!(res.is_none());
465    }
466
467    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
468        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
469        Ok(Some(new_stream_chunk))
470    } else {
471        Ok(None)
472    }
473}
474
475/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
476/// TODO(rc): merge with `TopNStaging`.
477struct ChangeBuffer {
478    buffer: HashMap<Vec<u8>, KeyOp>,
479}
480
481impl ChangeBuffer {
482    fn new() -> Self {
483        Self {
484            buffer: HashMap::new(),
485        }
486    }
487
488    fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
489        let entry = self.buffer.entry(pk);
490        match entry {
491            Entry::Vacant(e) => {
492                e.insert(KeyOp::Insert(value));
493            }
494            Entry::Occupied(mut e) => {
495                if let KeyOp::Delete(old_value) = e.get_mut() {
496                    let old_val = std::mem::take(old_value);
497                    e.insert(KeyOp::Update((old_val, value)));
498                } else {
499                    unreachable!();
500                }
501            }
502        }
503    }
504
505    fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
506        let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
507        match entry {
508            Entry::Vacant(e) => {
509                e.insert(KeyOp::Delete(old_value));
510            }
511            Entry::Occupied(mut e) => match e.get_mut() {
512                KeyOp::Insert(_) => {
513                    e.remove();
514                }
515                KeyOp::Update((prev, _curr)) => {
516                    let prev = std::mem::take(prev);
517                    e.insert(KeyOp::Delete(prev));
518                }
519                KeyOp::Delete(_) => {
520                    unreachable!();
521                }
522            },
523        }
524    }
525
526    fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
527        let entry = self.buffer.entry(pk);
528        match entry {
529            Entry::Vacant(e) => {
530                e.insert(KeyOp::Update((old_value, new_value)));
531            }
532            Entry::Occupied(mut e) => match e.get_mut() {
533                KeyOp::Insert(_) => {
534                    e.insert(KeyOp::Insert(new_value));
535                }
536                KeyOp::Update((_prev, curr)) => {
537                    *curr = new_value;
538                }
539                KeyOp::Delete(_) => {
540                    unreachable!()
541                }
542            },
543        }
544    }
545
546    fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
547        self.buffer
548    }
549}
550impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
551    fn execute(self: Box<Self>) -> BoxedMessageStream {
552        self.execute_inner().boxed()
553    }
554}
555
556impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
557    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558        f.debug_struct("MaterializeExecutor")
559            .field("arrange_key_indices", &self.arrange_key_indices)
560            .finish()
561    }
562}
563
564/// A cache for materialize executors.
565struct MaterializeCache<SD> {
566    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
567    row_serde: BasicSerde,
568    version_column_index: Option<u32>,
569    _serde: PhantomData<SD>,
570}
571
572type CacheValue = Option<CompactedRow>;
573
574impl<SD: ValueRowSerde> MaterializeCache<SD> {
575    fn new(
576        watermark_sequence: AtomicU64Ref,
577        metrics_info: MetricsInfo,
578        row_serde: BasicSerde,
579        version_column_index: Option<u32>,
580    ) -> Self {
581        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
582            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
583        Self {
584            lru_cache,
585            row_serde,
586            version_column_index,
587            _serde: PhantomData,
588        }
589    }
590
591    async fn handle<S: StateStore>(
592        &mut self,
593        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
594        table: &StateTableInner<S, SD>,
595        conflict_behavior: ConflictBehavior,
596        metrics: &MaterializeMetrics,
597    ) -> StreamExecutorResult<ChangeBuffer> {
598        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
599
600        let key_set: HashSet<Box<[u8]>> = row_ops
601            .iter()
602            .map(|(_, k, _)| k.as_slice().into())
603            .collect();
604
605        // Populate the LRU cache with the keys in input chunk.
606        // For new keys, row values are set to None.
607        self.fetch_keys(
608            key_set.iter().map(|v| v.deref()),
609            table,
610            conflict_behavior,
611            metrics,
612        )
613        .await?;
614
615        let mut change_buffer = ChangeBuffer::new();
616        let row_serde = self.row_serde.clone();
617        let version_column_index = self.version_column_index;
618        for (op, key, row) in row_ops {
619            match op {
620                Op::Insert | Op::UpdateInsert => {
621                    let Some(old_row) = self.get_expected(&key) else {
622                        // not exists before, meaning no conflict, simply insert
623                        change_buffer.insert(key.clone(), row.clone());
624                        self.lru_cache.put(key, Some(CompactedRow { row }));
625                        continue;
626                    };
627
628                    // now conflict happens, handle it according to the specified behavior
629                    match conflict_behavior {
630                        ConflictBehavior::Overwrite => {
631                            let need_overwrite = if let Some(idx) = version_column_index {
632                                let old_row_deserialized =
633                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
634                                let new_row_deserialized =
635                                    row_serde.deserializer.deserialize(row.clone())?;
636
637                                version_is_newer_or_equal(
638                                    old_row_deserialized.index(idx as usize),
639                                    new_row_deserialized.index(idx as usize),
640                                )
641                            } else {
642                                // no version column specified, just overwrite
643                                true
644                            };
645                            if need_overwrite {
646                                change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
647                                self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
648                            };
649                        }
650                        ConflictBehavior::IgnoreConflict => {
651                            // ignore conflict, do nothing
652                        }
653                        ConflictBehavior::DoUpdateIfNotNull => {
654                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
655                            // TODO(wcy-fdu): find a way to output the resulting new row directly to the downstream chunk, thus avoiding an additional deserialization step.
656
657                            let old_row_deserialized =
658                                row_serde.deserializer.deserialize(old_row.row.clone())?;
659                            let new_row_deserialized =
660                                row_serde.deserializer.deserialize(row.clone())?;
661                            let need_overwrite = if let Some(idx) = version_column_index {
662                                version_is_newer_or_equal(
663                                    old_row_deserialized.index(idx as usize),
664                                    new_row_deserialized.index(idx as usize),
665                                )
666                            } else {
667                                true
668                            };
669
670                            if need_overwrite {
671                                let mut row_deserialized_vec =
672                                    old_row_deserialized.into_inner().into_vec();
673                                replace_if_not_null(
674                                    &mut row_deserialized_vec,
675                                    new_row_deserialized,
676                                );
677                                let updated_row = OwnedRow::new(row_deserialized_vec);
678                                let updated_row_bytes =
679                                    Bytes::from(row_serde.serializer.serialize(updated_row));
680
681                                change_buffer.update(
682                                    key.clone(),
683                                    old_row.row.clone(),
684                                    updated_row_bytes.clone(),
685                                );
686                                self.lru_cache.put(
687                                    key.clone(),
688                                    Some(CompactedRow {
689                                        row: updated_row_bytes,
690                                    }),
691                                );
692                            }
693                        }
694                        _ => unreachable!(),
695                    };
696                }
697
698                Op::Delete | Op::UpdateDelete => {
699                    match conflict_behavior {
700                        checked_conflict_behaviors!() => {
701                            if let Some(old_row) = self.get_expected(&key) {
702                                change_buffer.delete(key.clone(), old_row.row.clone());
703                                // put a None into the cache to represent deletion
704                                self.lru_cache.put(key, None);
705                            } else {
706                                // delete a non-existent value
707                                // this is allowed in the case of mview conflict, so ignore
708                            };
709                        }
710                        _ => unreachable!(),
711                    };
712                }
713            }
714        }
715        Ok(change_buffer)
716    }
717
718    async fn fetch_keys<'a, S: StateStore>(
719        &mut self,
720        keys: impl Iterator<Item = &'a [u8]>,
721        table: &StateTableInner<S, SD>,
722        conflict_behavior: ConflictBehavior,
723        metrics: &MaterializeMetrics,
724    ) -> StreamExecutorResult<()> {
725        let mut futures = vec![];
726        for key in keys {
727            metrics.materialize_cache_total_count.inc();
728
729            if self.lru_cache.contains(key) {
730                metrics.materialize_cache_hit_count.inc();
731                continue;
732            }
733            futures.push(async {
734                let key_row = table.pk_serde().deserialize(key).unwrap();
735                let row = table.get_row(key_row).await?.map(CompactedRow::from);
736                StreamExecutorResult::Ok((key.to_vec(), row))
737            });
738        }
739
740        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
741        while let Some(result) = buffered.next().await {
742            let (key, row) = result?;
743            // for keys that are not in the table, `value` is None
744            match conflict_behavior {
745                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
746                _ => unreachable!(),
747            };
748        }
749
750        Ok(())
751    }
752
753    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
754        self.lru_cache.get(key).unwrap_or_else(|| {
755            panic!(
756                "the key {:?} has not been fetched in the materialize executor's cache ",
757                key
758            )
759        })
760    }
761
762    fn evict(&mut self) {
763        self.lru_cache.evict()
764    }
765}
766
767/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
768/// column value in the replacement row is not null.
769///
770/// # Example
771///
772/// ```ignore
773/// let mut row = vec![Some(1), None, Some(3)];
774/// let replacement = vec![Some(10), Some(20), None];
775/// replace_if_not_null(&mut row, replacement);
776/// ```
777///
778/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
779fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
780    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
781        if let Some(new_value) = new_col {
782            *old_col = Some(new_value);
783        }
784    }
785}
786
787/// Determines whether pk conflict handling should update an existing row with newly-received value,
788/// according to the value of version column of the new and old rows.
789fn version_is_newer_or_equal(
790    old_version: &Option<ScalarImpl>,
791    new_version: &Option<ScalarImpl>,
792) -> bool {
793    cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
794}
795
796#[cfg(test)]
797mod tests {
798
799    use std::iter;
800    use std::sync::atomic::AtomicU64;
801
802    use rand::rngs::SmallRng;
803    use rand::{Rng, RngCore, SeedableRng};
804    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
805    use risingwave_common::catalog::Field;
806    use risingwave_common::util::epoch::test_epoch;
807    use risingwave_common::util::sort_util::OrderType;
808    use risingwave_hummock_sdk::HummockReadEpoch;
809    use risingwave_storage::memory::MemoryStateStore;
810    use risingwave_storage::table::batch_table::BatchTable;
811
812    use super::*;
813    use crate::executor::test_utils::*;
814
815    #[tokio::test]
816    async fn test_materialize_executor() {
817        // Prepare storage and memtable.
818        let memory_state_store = MemoryStateStore::new();
819        let table_id = TableId::new(1);
820        // Two columns of int32 type, the first column is PK.
821        let schema = Schema::new(vec![
822            Field::unnamed(DataType::Int32),
823            Field::unnamed(DataType::Int32),
824        ]);
825        let column_ids = vec![0.into(), 1.into()];
826
827        // Prepare source chunks.
828        let chunk1 = StreamChunk::from_pretty(
829            " i i
830            + 1 4
831            + 2 5
832            + 3 6",
833        );
834        let chunk2 = StreamChunk::from_pretty(
835            " i i
836            + 7 8
837            - 3 6",
838        );
839
840        // Prepare stream executors.
841        let source = MockSource::with_messages(vec![
842            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
843            Message::Chunk(chunk1),
844            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
845            Message::Chunk(chunk2),
846            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
847        ])
848        .into_executor(schema.clone(), PkIndices::new());
849
850        let order_types = vec![OrderType::ascending()];
851        let column_descs = vec![
852            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
853            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
854        ];
855
856        let table = BatchTable::for_test(
857            memory_state_store.clone(),
858            table_id,
859            column_descs,
860            order_types,
861            vec![0],
862            vec![0, 1],
863        );
864
865        let mut materialize_executor = MaterializeExecutor::for_test(
866            source,
867            memory_state_store,
868            table_id,
869            vec![ColumnOrder::new(0, OrderType::ascending())],
870            column_ids,
871            Arc::new(AtomicU64::new(0)),
872            ConflictBehavior::NoCheck,
873        )
874        .await
875        .boxed()
876        .execute();
877        materialize_executor.next().await.transpose().unwrap();
878
879        materialize_executor.next().await.transpose().unwrap();
880
881        // First stream chunk. We check the existence of (3) -> (3,6)
882        match materialize_executor.next().await.transpose().unwrap() {
883            Some(Message::Barrier(_)) => {
884                let row = table
885                    .get_row(
886                        &OwnedRow::new(vec![Some(3_i32.into())]),
887                        HummockReadEpoch::NoWait(u64::MAX),
888                    )
889                    .await
890                    .unwrap();
891                assert_eq!(
892                    row,
893                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
894                );
895            }
896            _ => unreachable!(),
897        }
898        materialize_executor.next().await.transpose().unwrap();
899        // Second stream chunk. We check the existence of (7) -> (7,8)
900        match materialize_executor.next().await.transpose().unwrap() {
901            Some(Message::Barrier(_)) => {
902                let row = table
903                    .get_row(
904                        &OwnedRow::new(vec![Some(7_i32.into())]),
905                        HummockReadEpoch::NoWait(u64::MAX),
906                    )
907                    .await
908                    .unwrap();
909                assert_eq!(
910                    row,
911                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
912                );
913            }
914            _ => unreachable!(),
915        }
916    }
917
918    // https://github.com/risingwavelabs/risingwave/issues/13346
919    #[tokio::test]
920    async fn test_upsert_stream() {
921        // Prepare storage and memtable.
922        let memory_state_store = MemoryStateStore::new();
923        let table_id = TableId::new(1);
924        // Two columns of int32 type, the first column is PK.
925        let schema = Schema::new(vec![
926            Field::unnamed(DataType::Int32),
927            Field::unnamed(DataType::Int32),
928        ]);
929        let column_ids = vec![0.into(), 1.into()];
930
931        // test double insert one pk, the latter needs to override the former.
932        let chunk1 = StreamChunk::from_pretty(
933            " i i
934            + 1 1",
935        );
936
937        let chunk2 = StreamChunk::from_pretty(
938            " i i
939            + 1 2
940            - 1 2",
941        );
942
943        // Prepare stream executors.
944        let source = MockSource::with_messages(vec![
945            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
946            Message::Chunk(chunk1),
947            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
948            Message::Chunk(chunk2),
949            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
950        ])
951        .into_executor(schema.clone(), PkIndices::new());
952
953        let order_types = vec![OrderType::ascending()];
954        let column_descs = vec![
955            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
956            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
957        ];
958
959        let table = BatchTable::for_test(
960            memory_state_store.clone(),
961            table_id,
962            column_descs,
963            order_types,
964            vec![0],
965            vec![0, 1],
966        );
967
968        let mut materialize_executor = MaterializeExecutor::for_test(
969            source,
970            memory_state_store,
971            table_id,
972            vec![ColumnOrder::new(0, OrderType::ascending())],
973            column_ids,
974            Arc::new(AtomicU64::new(0)),
975            ConflictBehavior::Overwrite,
976        )
977        .await
978        .boxed()
979        .execute();
980        materialize_executor.next().await.transpose().unwrap();
981
982        materialize_executor.next().await.transpose().unwrap();
983        materialize_executor.next().await.transpose().unwrap();
984        materialize_executor.next().await.transpose().unwrap();
985
986        match materialize_executor.next().await.transpose().unwrap() {
987            Some(Message::Barrier(_)) => {
988                let row = table
989                    .get_row(
990                        &OwnedRow::new(vec![Some(1_i32.into())]),
991                        HummockReadEpoch::NoWait(u64::MAX),
992                    )
993                    .await
994                    .unwrap();
995                assert!(row.is_none());
996            }
997            _ => unreachable!(),
998        }
999    }
1000
1001    #[tokio::test]
1002    async fn test_check_insert_conflict() {
1003        // Prepare storage and memtable.
1004        let memory_state_store = MemoryStateStore::new();
1005        let table_id = TableId::new(1);
1006        // Two columns of int32 type, the first column is PK.
1007        let schema = Schema::new(vec![
1008            Field::unnamed(DataType::Int32),
1009            Field::unnamed(DataType::Int32),
1010        ]);
1011        let column_ids = vec![0.into(), 1.into()];
1012
1013        // test double insert one pk, the latter needs to override the former.
1014        let chunk1 = StreamChunk::from_pretty(
1015            " i i
1016            + 1 3
1017            + 1 4
1018            + 2 5
1019            + 3 6",
1020        );
1021
1022        let chunk2 = StreamChunk::from_pretty(
1023            " i i
1024            + 1 3
1025            + 2 6",
1026        );
1027
1028        // test delete wrong value, delete inexistent pk
1029        let chunk3 = StreamChunk::from_pretty(
1030            " i i
1031            + 1 4",
1032        );
1033
1034        // Prepare stream executors.
1035        let source = MockSource::with_messages(vec![
1036            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1037            Message::Chunk(chunk1),
1038            Message::Chunk(chunk2),
1039            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1040            Message::Chunk(chunk3),
1041            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1042        ])
1043        .into_executor(schema.clone(), PkIndices::new());
1044
1045        let order_types = vec![OrderType::ascending()];
1046        let column_descs = vec![
1047            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1048            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1049        ];
1050
1051        let table = BatchTable::for_test(
1052            memory_state_store.clone(),
1053            table_id,
1054            column_descs,
1055            order_types,
1056            vec![0],
1057            vec![0, 1],
1058        );
1059
1060        let mut materialize_executor = MaterializeExecutor::for_test(
1061            source,
1062            memory_state_store,
1063            table_id,
1064            vec![ColumnOrder::new(0, OrderType::ascending())],
1065            column_ids,
1066            Arc::new(AtomicU64::new(0)),
1067            ConflictBehavior::Overwrite,
1068        )
1069        .await
1070        .boxed()
1071        .execute();
1072        materialize_executor.next().await.transpose().unwrap();
1073
1074        materialize_executor.next().await.transpose().unwrap();
1075        materialize_executor.next().await.transpose().unwrap();
1076
1077        // First stream chunk. We check the existence of (3) -> (3,6)
1078        match materialize_executor.next().await.transpose().unwrap() {
1079            Some(Message::Barrier(_)) => {
1080                let row = table
1081                    .get_row(
1082                        &OwnedRow::new(vec![Some(3_i32.into())]),
1083                        HummockReadEpoch::NoWait(u64::MAX),
1084                    )
1085                    .await
1086                    .unwrap();
1087                assert_eq!(
1088                    row,
1089                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1090                );
1091
1092                let row = table
1093                    .get_row(
1094                        &OwnedRow::new(vec![Some(1_i32.into())]),
1095                        HummockReadEpoch::NoWait(u64::MAX),
1096                    )
1097                    .await
1098                    .unwrap();
1099                assert_eq!(
1100                    row,
1101                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1102                );
1103
1104                let row = table
1105                    .get_row(
1106                        &OwnedRow::new(vec![Some(2_i32.into())]),
1107                        HummockReadEpoch::NoWait(u64::MAX),
1108                    )
1109                    .await
1110                    .unwrap();
1111                assert_eq!(
1112                    row,
1113                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1114                );
1115            }
1116            _ => unreachable!(),
1117        }
1118    }
1119
1120    #[tokio::test]
1121    async fn test_delete_and_update_conflict() {
1122        // Prepare storage and memtable.
1123        let memory_state_store = MemoryStateStore::new();
1124        let table_id = TableId::new(1);
1125        // Two columns of int32 type, the first column is PK.
1126        let schema = Schema::new(vec![
1127            Field::unnamed(DataType::Int32),
1128            Field::unnamed(DataType::Int32),
1129        ]);
1130        let column_ids = vec![0.into(), 1.into()];
1131
1132        // test double insert one pk, the latter needs to override the former.
1133        let chunk1 = StreamChunk::from_pretty(
1134            " i i
1135            + 1 4
1136            + 2 5
1137            + 3 6
1138            U- 8 1
1139            U+ 8 2
1140            + 8 3",
1141        );
1142
1143        // test delete wrong value, delete inexistent pk
1144        let chunk2 = StreamChunk::from_pretty(
1145            " i i
1146            + 7 8
1147            - 3 4
1148            - 5 0",
1149        );
1150
1151        // test delete wrong value, delete inexistent pk
1152        let chunk3 = StreamChunk::from_pretty(
1153            " i i
1154            + 1 5
1155            U- 2 4
1156            U+ 2 8
1157            U- 9 0
1158            U+ 9 1",
1159        );
1160
1161        // Prepare stream executors.
1162        let source = MockSource::with_messages(vec![
1163            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1164            Message::Chunk(chunk1),
1165            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1166            Message::Chunk(chunk2),
1167            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1168            Message::Chunk(chunk3),
1169            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1170        ])
1171        .into_executor(schema.clone(), PkIndices::new());
1172
1173        let order_types = vec![OrderType::ascending()];
1174        let column_descs = vec![
1175            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1176            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1177        ];
1178
1179        let table = BatchTable::for_test(
1180            memory_state_store.clone(),
1181            table_id,
1182            column_descs,
1183            order_types,
1184            vec![0],
1185            vec![0, 1],
1186        );
1187
1188        let mut materialize_executor = MaterializeExecutor::for_test(
1189            source,
1190            memory_state_store,
1191            table_id,
1192            vec![ColumnOrder::new(0, OrderType::ascending())],
1193            column_ids,
1194            Arc::new(AtomicU64::new(0)),
1195            ConflictBehavior::Overwrite,
1196        )
1197        .await
1198        .boxed()
1199        .execute();
1200        materialize_executor.next().await.transpose().unwrap();
1201
1202        materialize_executor.next().await.transpose().unwrap();
1203
1204        // First stream chunk. We check the existence of (3) -> (3,6)
1205        match materialize_executor.next().await.transpose().unwrap() {
1206            Some(Message::Barrier(_)) => {
1207                // can read (8, 3), check insert after update
1208                let row = table
1209                    .get_row(
1210                        &OwnedRow::new(vec![Some(8_i32.into())]),
1211                        HummockReadEpoch::NoWait(u64::MAX),
1212                    )
1213                    .await
1214                    .unwrap();
1215                assert_eq!(
1216                    row,
1217                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1218                );
1219            }
1220            _ => unreachable!(),
1221        }
1222        materialize_executor.next().await.transpose().unwrap();
1223
1224        match materialize_executor.next().await.transpose().unwrap() {
1225            Some(Message::Barrier(_)) => {
1226                let row = table
1227                    .get_row(
1228                        &OwnedRow::new(vec![Some(7_i32.into())]),
1229                        HummockReadEpoch::NoWait(u64::MAX),
1230                    )
1231                    .await
1232                    .unwrap();
1233                assert_eq!(
1234                    row,
1235                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1236                );
1237
1238                // check delete wrong value
1239                let row = table
1240                    .get_row(
1241                        &OwnedRow::new(vec![Some(3_i32.into())]),
1242                        HummockReadEpoch::NoWait(u64::MAX),
1243                    )
1244                    .await
1245                    .unwrap();
1246                assert_eq!(row, None);
1247
1248                // check delete wrong pk
1249                let row = table
1250                    .get_row(
1251                        &OwnedRow::new(vec![Some(5_i32.into())]),
1252                        HummockReadEpoch::NoWait(u64::MAX),
1253                    )
1254                    .await
1255                    .unwrap();
1256                assert_eq!(row, None);
1257            }
1258            _ => unreachable!(),
1259        }
1260
1261        materialize_executor.next().await.transpose().unwrap();
1262        // Second stream chunk. We check the existence of (7) -> (7,8)
1263        match materialize_executor.next().await.transpose().unwrap() {
1264            Some(Message::Barrier(_)) => {
1265                let row = table
1266                    .get_row(
1267                        &OwnedRow::new(vec![Some(1_i32.into())]),
1268                        HummockReadEpoch::NoWait(u64::MAX),
1269                    )
1270                    .await
1271                    .unwrap();
1272                assert_eq!(
1273                    row,
1274                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1275                );
1276
1277                // check update wrong value
1278                let row = table
1279                    .get_row(
1280                        &OwnedRow::new(vec![Some(2_i32.into())]),
1281                        HummockReadEpoch::NoWait(u64::MAX),
1282                    )
1283                    .await
1284                    .unwrap();
1285                assert_eq!(
1286                    row,
1287                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1288                );
1289
1290                // check update wrong pk, should become insert
1291                let row = table
1292                    .get_row(
1293                        &OwnedRow::new(vec![Some(9_i32.into())]),
1294                        HummockReadEpoch::NoWait(u64::MAX),
1295                    )
1296                    .await
1297                    .unwrap();
1298                assert_eq!(
1299                    row,
1300                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1301                );
1302            }
1303            _ => unreachable!(),
1304        }
1305    }
1306
1307    #[tokio::test]
1308    async fn test_ignore_insert_conflict() {
1309        // Prepare storage and memtable.
1310        let memory_state_store = MemoryStateStore::new();
1311        let table_id = TableId::new(1);
1312        // Two columns of int32 type, the first column is PK.
1313        let schema = Schema::new(vec![
1314            Field::unnamed(DataType::Int32),
1315            Field::unnamed(DataType::Int32),
1316        ]);
1317        let column_ids = vec![0.into(), 1.into()];
1318
1319        // test double insert one pk, the latter needs to be ignored.
1320        let chunk1 = StreamChunk::from_pretty(
1321            " i i
1322            + 1 3
1323            + 1 4
1324            + 2 5
1325            + 3 6",
1326        );
1327
1328        let chunk2 = StreamChunk::from_pretty(
1329            " i i
1330            + 1 5
1331            + 2 6",
1332        );
1333
1334        // test delete wrong value, delete inexistent pk
1335        let chunk3 = StreamChunk::from_pretty(
1336            " i i
1337            + 1 6",
1338        );
1339
1340        // Prepare stream executors.
1341        let source = MockSource::with_messages(vec![
1342            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1343            Message::Chunk(chunk1),
1344            Message::Chunk(chunk2),
1345            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1346            Message::Chunk(chunk3),
1347            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1348        ])
1349        .into_executor(schema.clone(), PkIndices::new());
1350
1351        let order_types = vec![OrderType::ascending()];
1352        let column_descs = vec![
1353            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1354            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1355        ];
1356
1357        let table = BatchTable::for_test(
1358            memory_state_store.clone(),
1359            table_id,
1360            column_descs,
1361            order_types,
1362            vec![0],
1363            vec![0, 1],
1364        );
1365
1366        let mut materialize_executor = MaterializeExecutor::for_test(
1367            source,
1368            memory_state_store,
1369            table_id,
1370            vec![ColumnOrder::new(0, OrderType::ascending())],
1371            column_ids,
1372            Arc::new(AtomicU64::new(0)),
1373            ConflictBehavior::IgnoreConflict,
1374        )
1375        .await
1376        .boxed()
1377        .execute();
1378        materialize_executor.next().await.transpose().unwrap();
1379
1380        materialize_executor.next().await.transpose().unwrap();
1381        materialize_executor.next().await.transpose().unwrap();
1382
1383        // First stream chunk. We check the existence of (3) -> (3,6)
1384        match materialize_executor.next().await.transpose().unwrap() {
1385            Some(Message::Barrier(_)) => {
1386                let row = table
1387                    .get_row(
1388                        &OwnedRow::new(vec![Some(3_i32.into())]),
1389                        HummockReadEpoch::NoWait(u64::MAX),
1390                    )
1391                    .await
1392                    .unwrap();
1393                assert_eq!(
1394                    row,
1395                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1396                );
1397
1398                let row = table
1399                    .get_row(
1400                        &OwnedRow::new(vec![Some(1_i32.into())]),
1401                        HummockReadEpoch::NoWait(u64::MAX),
1402                    )
1403                    .await
1404                    .unwrap();
1405                assert_eq!(
1406                    row,
1407                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1408                );
1409
1410                let row = table
1411                    .get_row(
1412                        &OwnedRow::new(vec![Some(2_i32.into())]),
1413                        HummockReadEpoch::NoWait(u64::MAX),
1414                    )
1415                    .await
1416                    .unwrap();
1417                assert_eq!(
1418                    row,
1419                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1420                );
1421            }
1422            _ => unreachable!(),
1423        }
1424    }
1425
1426    #[tokio::test]
1427    async fn test_ignore_delete_then_insert() {
1428        // Prepare storage and memtable.
1429        let memory_state_store = MemoryStateStore::new();
1430        let table_id = TableId::new(1);
1431        // Two columns of int32 type, the first column is PK.
1432        let schema = Schema::new(vec![
1433            Field::unnamed(DataType::Int32),
1434            Field::unnamed(DataType::Int32),
1435        ]);
1436        let column_ids = vec![0.into(), 1.into()];
1437
1438        // test insert after delete one pk, the latter insert should succeed.
1439        let chunk1 = StreamChunk::from_pretty(
1440            " i i
1441            + 1 3
1442            - 1 3
1443            + 1 6",
1444        );
1445
1446        // Prepare stream executors.
1447        let source = MockSource::with_messages(vec![
1448            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1449            Message::Chunk(chunk1),
1450            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1451        ])
1452        .into_executor(schema.clone(), PkIndices::new());
1453
1454        let order_types = vec![OrderType::ascending()];
1455        let column_descs = vec![
1456            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1457            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1458        ];
1459
1460        let table = BatchTable::for_test(
1461            memory_state_store.clone(),
1462            table_id,
1463            column_descs,
1464            order_types,
1465            vec![0],
1466            vec![0, 1],
1467        );
1468
1469        let mut materialize_executor = MaterializeExecutor::for_test(
1470            source,
1471            memory_state_store,
1472            table_id,
1473            vec![ColumnOrder::new(0, OrderType::ascending())],
1474            column_ids,
1475            Arc::new(AtomicU64::new(0)),
1476            ConflictBehavior::IgnoreConflict,
1477        )
1478        .await
1479        .boxed()
1480        .execute();
1481        let _msg1 = materialize_executor
1482            .next()
1483            .await
1484            .transpose()
1485            .unwrap()
1486            .unwrap()
1487            .as_barrier()
1488            .unwrap();
1489        let _msg2 = materialize_executor
1490            .next()
1491            .await
1492            .transpose()
1493            .unwrap()
1494            .unwrap()
1495            .as_chunk()
1496            .unwrap();
1497        let _msg3 = materialize_executor
1498            .next()
1499            .await
1500            .transpose()
1501            .unwrap()
1502            .unwrap()
1503            .as_barrier()
1504            .unwrap();
1505
1506        let row = table
1507            .get_row(
1508                &OwnedRow::new(vec![Some(1_i32.into())]),
1509                HummockReadEpoch::NoWait(u64::MAX),
1510            )
1511            .await
1512            .unwrap();
1513        assert_eq!(
1514            row,
1515            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1516        );
1517    }
1518
1519    #[tokio::test]
1520    async fn test_ignore_delete_and_update_conflict() {
1521        // Prepare storage and memtable.
1522        let memory_state_store = MemoryStateStore::new();
1523        let table_id = TableId::new(1);
1524        // Two columns of int32 type, the first column is PK.
1525        let schema = Schema::new(vec![
1526            Field::unnamed(DataType::Int32),
1527            Field::unnamed(DataType::Int32),
1528        ]);
1529        let column_ids = vec![0.into(), 1.into()];
1530
1531        // test double insert one pk, the latter should be ignored.
1532        let chunk1 = StreamChunk::from_pretty(
1533            " i i
1534            + 1 4
1535            + 2 5
1536            + 3 6
1537            U- 8 1
1538            U+ 8 2
1539            + 8 3",
1540        );
1541
1542        // test delete wrong value, delete inexistent pk
1543        let chunk2 = StreamChunk::from_pretty(
1544            " i i
1545            + 7 8
1546            - 3 4
1547            - 5 0",
1548        );
1549
1550        // test delete wrong value, delete inexistent pk
1551        let chunk3 = StreamChunk::from_pretty(
1552            " i i
1553            + 1 5
1554            U- 2 4
1555            U+ 2 8
1556            U- 9 0
1557            U+ 9 1",
1558        );
1559
1560        // Prepare stream executors.
1561        let source = MockSource::with_messages(vec![
1562            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1563            Message::Chunk(chunk1),
1564            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1565            Message::Chunk(chunk2),
1566            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1567            Message::Chunk(chunk3),
1568            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1569        ])
1570        .into_executor(schema.clone(), PkIndices::new());
1571
1572        let order_types = vec![OrderType::ascending()];
1573        let column_descs = vec![
1574            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1575            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1576        ];
1577
1578        let table = BatchTable::for_test(
1579            memory_state_store.clone(),
1580            table_id,
1581            column_descs,
1582            order_types,
1583            vec![0],
1584            vec![0, 1],
1585        );
1586
1587        let mut materialize_executor = MaterializeExecutor::for_test(
1588            source,
1589            memory_state_store,
1590            table_id,
1591            vec![ColumnOrder::new(0, OrderType::ascending())],
1592            column_ids,
1593            Arc::new(AtomicU64::new(0)),
1594            ConflictBehavior::IgnoreConflict,
1595        )
1596        .await
1597        .boxed()
1598        .execute();
1599        materialize_executor.next().await.transpose().unwrap();
1600
1601        materialize_executor.next().await.transpose().unwrap();
1602
1603        // First stream chunk. We check the existence of (3) -> (3,6)
1604        match materialize_executor.next().await.transpose().unwrap() {
1605            Some(Message::Barrier(_)) => {
1606                // can read (8, 2), check insert after update
1607                let row = table
1608                    .get_row(
1609                        &OwnedRow::new(vec![Some(8_i32.into())]),
1610                        HummockReadEpoch::NoWait(u64::MAX),
1611                    )
1612                    .await
1613                    .unwrap();
1614                assert_eq!(
1615                    row,
1616                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1617                );
1618            }
1619            _ => unreachable!(),
1620        }
1621        materialize_executor.next().await.transpose().unwrap();
1622
1623        match materialize_executor.next().await.transpose().unwrap() {
1624            Some(Message::Barrier(_)) => {
1625                let row = table
1626                    .get_row(
1627                        &OwnedRow::new(vec![Some(7_i32.into())]),
1628                        HummockReadEpoch::NoWait(u64::MAX),
1629                    )
1630                    .await
1631                    .unwrap();
1632                assert_eq!(
1633                    row,
1634                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1635                );
1636
1637                // check delete wrong value
1638                let row = table
1639                    .get_row(
1640                        &OwnedRow::new(vec![Some(3_i32.into())]),
1641                        HummockReadEpoch::NoWait(u64::MAX),
1642                    )
1643                    .await
1644                    .unwrap();
1645                assert_eq!(row, None);
1646
1647                // check delete wrong pk
1648                let row = table
1649                    .get_row(
1650                        &OwnedRow::new(vec![Some(5_i32.into())]),
1651                        HummockReadEpoch::NoWait(u64::MAX),
1652                    )
1653                    .await
1654                    .unwrap();
1655                assert_eq!(row, None);
1656            }
1657            _ => unreachable!(),
1658        }
1659
1660        materialize_executor.next().await.transpose().unwrap();
1661        // materialize_executor.next().await.transpose().unwrap();
1662        // Second stream chunk. We check the existence of (7) -> (7,8)
1663        match materialize_executor.next().await.transpose().unwrap() {
1664            Some(Message::Barrier(_)) => {
1665                let row = table
1666                    .get_row(
1667                        &OwnedRow::new(vec![Some(1_i32.into())]),
1668                        HummockReadEpoch::NoWait(u64::MAX),
1669                    )
1670                    .await
1671                    .unwrap();
1672                assert_eq!(
1673                    row,
1674                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1675                );
1676
1677                // check update wrong value
1678                let row = table
1679                    .get_row(
1680                        &OwnedRow::new(vec![Some(2_i32.into())]),
1681                        HummockReadEpoch::NoWait(u64::MAX),
1682                    )
1683                    .await
1684                    .unwrap();
1685                assert_eq!(
1686                    row,
1687                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1688                );
1689
1690                // check update wrong pk, should become insert
1691                let row = table
1692                    .get_row(
1693                        &OwnedRow::new(vec![Some(9_i32.into())]),
1694                        HummockReadEpoch::NoWait(u64::MAX),
1695                    )
1696                    .await
1697                    .unwrap();
1698                assert_eq!(
1699                    row,
1700                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1701                );
1702            }
1703            _ => unreachable!(),
1704        }
1705    }
1706
1707    #[tokio::test]
1708    async fn test_do_update_if_not_null_conflict() {
1709        // Prepare storage and memtable.
1710        let memory_state_store = MemoryStateStore::new();
1711        let table_id = TableId::new(1);
1712        // Two columns of int32 type, the first column is PK.
1713        let schema = Schema::new(vec![
1714            Field::unnamed(DataType::Int32),
1715            Field::unnamed(DataType::Int32),
1716        ]);
1717        let column_ids = vec![0.into(), 1.into()];
1718
1719        // should get (8, 2)
1720        let chunk1 = StreamChunk::from_pretty(
1721            " i i
1722            + 1 4
1723            + 2 .
1724            + 3 6
1725            U- 8 .
1726            U+ 8 2
1727            + 8 .",
1728        );
1729
1730        // should not get (3, x), should not get (5, 0)
1731        let chunk2 = StreamChunk::from_pretty(
1732            " i i
1733            + 7 8
1734            - 3 4
1735            - 5 0",
1736        );
1737
1738        // should get (2, None), (7, 8)
1739        let chunk3 = StreamChunk::from_pretty(
1740            " i i
1741            + 1 5
1742            + 7 .
1743            U- 2 4
1744            U+ 2 .
1745            U- 9 0
1746            U+ 9 1",
1747        );
1748
1749        // Prepare stream executors.
1750        let source = MockSource::with_messages(vec![
1751            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1752            Message::Chunk(chunk1),
1753            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1754            Message::Chunk(chunk2),
1755            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1756            Message::Chunk(chunk3),
1757            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1758        ])
1759        .into_executor(schema.clone(), PkIndices::new());
1760
1761        let order_types = vec![OrderType::ascending()];
1762        let column_descs = vec![
1763            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1764            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1765        ];
1766
1767        let table = BatchTable::for_test(
1768            memory_state_store.clone(),
1769            table_id,
1770            column_descs,
1771            order_types,
1772            vec![0],
1773            vec![0, 1],
1774        );
1775
1776        let mut materialize_executor = MaterializeExecutor::for_test(
1777            source,
1778            memory_state_store,
1779            table_id,
1780            vec![ColumnOrder::new(0, OrderType::ascending())],
1781            column_ids,
1782            Arc::new(AtomicU64::new(0)),
1783            ConflictBehavior::DoUpdateIfNotNull,
1784        )
1785        .await
1786        .boxed()
1787        .execute();
1788        materialize_executor.next().await.transpose().unwrap();
1789
1790        materialize_executor.next().await.transpose().unwrap();
1791
1792        // First stream chunk. We check the existence of (3) -> (3,6)
1793        match materialize_executor.next().await.transpose().unwrap() {
1794            Some(Message::Barrier(_)) => {
1795                let row = table
1796                    .get_row(
1797                        &OwnedRow::new(vec![Some(8_i32.into())]),
1798                        HummockReadEpoch::NoWait(u64::MAX),
1799                    )
1800                    .await
1801                    .unwrap();
1802                assert_eq!(
1803                    row,
1804                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1805                );
1806
1807                let row = table
1808                    .get_row(
1809                        &OwnedRow::new(vec![Some(2_i32.into())]),
1810                        HummockReadEpoch::NoWait(u64::MAX),
1811                    )
1812                    .await
1813                    .unwrap();
1814                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1815            }
1816            _ => unreachable!(),
1817        }
1818        materialize_executor.next().await.transpose().unwrap();
1819
1820        match materialize_executor.next().await.transpose().unwrap() {
1821            Some(Message::Barrier(_)) => {
1822                let row = table
1823                    .get_row(
1824                        &OwnedRow::new(vec![Some(7_i32.into())]),
1825                        HummockReadEpoch::NoWait(u64::MAX),
1826                    )
1827                    .await
1828                    .unwrap();
1829                assert_eq!(
1830                    row,
1831                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1832                );
1833
1834                // check delete wrong value
1835                let row = table
1836                    .get_row(
1837                        &OwnedRow::new(vec![Some(3_i32.into())]),
1838                        HummockReadEpoch::NoWait(u64::MAX),
1839                    )
1840                    .await
1841                    .unwrap();
1842                assert_eq!(row, None);
1843
1844                // check delete wrong pk
1845                let row = table
1846                    .get_row(
1847                        &OwnedRow::new(vec![Some(5_i32.into())]),
1848                        HummockReadEpoch::NoWait(u64::MAX),
1849                    )
1850                    .await
1851                    .unwrap();
1852                assert_eq!(row, None);
1853            }
1854            _ => unreachable!(),
1855        }
1856
1857        materialize_executor.next().await.transpose().unwrap();
1858        // materialize_executor.next().await.transpose().unwrap();
1859        // Second stream chunk. We check the existence of (7) -> (7,8)
1860        match materialize_executor.next().await.transpose().unwrap() {
1861            Some(Message::Barrier(_)) => {
1862                let row = table
1863                    .get_row(
1864                        &OwnedRow::new(vec![Some(7_i32.into())]),
1865                        HummockReadEpoch::NoWait(u64::MAX),
1866                    )
1867                    .await
1868                    .unwrap();
1869                assert_eq!(
1870                    row,
1871                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1872                );
1873
1874                // check update wrong value
1875                let row = table
1876                    .get_row(
1877                        &OwnedRow::new(vec![Some(2_i32.into())]),
1878                        HummockReadEpoch::NoWait(u64::MAX),
1879                    )
1880                    .await
1881                    .unwrap();
1882                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1883
1884                // check update wrong pk, should become insert
1885                let row = table
1886                    .get_row(
1887                        &OwnedRow::new(vec![Some(9_i32.into())]),
1888                        HummockReadEpoch::NoWait(u64::MAX),
1889                    )
1890                    .await
1891                    .unwrap();
1892                assert_eq!(
1893                    row,
1894                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1895                );
1896            }
1897            _ => unreachable!(),
1898        }
1899    }
1900
1901    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1902        const KN: u32 = 4;
1903        const SEED: u64 = 998244353;
1904        let mut ret = vec![];
1905        let mut builder =
1906            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1907        let mut rng = SmallRng::seed_from_u64(SEED);
1908
1909        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1910            let len = c.data_chunk().capacity();
1911            let mut c = StreamChunkMut::from(c);
1912            for i in 0..len {
1913                c.set_vis(i, rng.random_bool(0.5));
1914            }
1915            c.into()
1916        };
1917        for _ in 0..row_number {
1918            let k = (rng.next_u32() % KN) as i32;
1919            let v = rng.next_u32() as i32;
1920            let op = if rng.random_bool(0.5) {
1921                Op::Insert
1922            } else {
1923                Op::Delete
1924            };
1925            if let Some(c) =
1926                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1927            {
1928                ret.push(random_vis(c, &mut rng));
1929            }
1930        }
1931        if let Some(c) = builder.take() {
1932            ret.push(random_vis(c, &mut rng));
1933        }
1934        ret
1935    }
1936
1937    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1938        const N: usize = 100000;
1939
1940        // Prepare storage and memtable.
1941        let memory_state_store = MemoryStateStore::new();
1942        let table_id = TableId::new(1);
1943        // Two columns of int32 type, the first column is PK.
1944        let schema = Schema::new(vec![
1945            Field::unnamed(DataType::Int32),
1946            Field::unnamed(DataType::Int32),
1947        ]);
1948        let column_ids = vec![0.into(), 1.into()];
1949
1950        let chunks = gen_fuzz_data(N, 128);
1951        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1952            .chain(chunks.into_iter().map(Message::Chunk))
1953            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1954                test_epoch(2),
1955            ))))
1956            .collect();
1957        // Prepare stream executors.
1958        let source =
1959            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1960
1961        let mut materialize_executor = MaterializeExecutor::for_test(
1962            source,
1963            memory_state_store.clone(),
1964            table_id,
1965            vec![ColumnOrder::new(0, OrderType::ascending())],
1966            column_ids,
1967            Arc::new(AtomicU64::new(0)),
1968            conflict_behavior,
1969        )
1970        .await
1971        .boxed()
1972        .execute();
1973        materialize_executor.expect_barrier().await;
1974
1975        let order_types = vec![OrderType::ascending()];
1976        let column_descs = vec![
1977            ColumnDesc::unnamed(0.into(), DataType::Int32),
1978            ColumnDesc::unnamed(1.into(), DataType::Int32),
1979        ];
1980        let pk_indices = vec![0];
1981
1982        let mut table = StateTable::from_table_catalog(
1983            &gen_pbtable(
1984                TableId::from(1002),
1985                column_descs.clone(),
1986                order_types,
1987                pk_indices,
1988                0,
1989            ),
1990            memory_state_store.clone(),
1991            None,
1992        )
1993        .await;
1994
1995        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
1996            // check with state table's memtable
1997            table.write_chunk(c);
1998        }
1999    }
2000
2001    #[tokio::test]
2002    async fn fuzz_test_stream_consistent_upsert() {
2003        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2004    }
2005
2006    #[tokio::test]
2007    async fn fuzz_test_stream_consistent_ignore() {
2008        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2009    }
2010}