risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{
27    ColumnDesc, ColumnId, ConflictBehavior, TableId, checked_conflict_behaviors,
28};
29use risingwave_common::row::{CompactedRow, RowDeserializer};
30use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
31use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
32use risingwave_common::util::sort_util::{ColumnOrder, OrderType, cmp_datum};
33use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
34use risingwave_pb::catalog::Table;
35use risingwave_pb::catalog::table::Engine;
36use risingwave_storage::mem_table::KeyOp;
37use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
38
39use crate::cache::ManagedLruCache;
40use crate::common::metrics::MetricsInfo;
41use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
42use crate::common::table::test_utils::gen_pbtable;
43use crate::executor::monitor::MaterializeMetrics;
44use crate::executor::prelude::*;
45
46/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
47pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
48    input: Executor,
49
50    schema: Schema,
51
52    state_table: StateTableInner<S, SD>,
53
54    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
55    arrange_key_indices: Vec<usize>,
56
57    actor_context: ActorContextRef,
58
59    materialize_cache: MaterializeCache<SD>,
60
61    conflict_behavior: ConflictBehavior,
62
63    version_column_index: Option<u32>,
64
65    may_have_downstream: bool,
66
67    depended_subscription_ids: HashSet<u32>,
68
69    metrics: MaterializeMetrics,
70
71    /// No data will be written to hummock table. This Materialize is just a dummy node.
72    /// Used for APPEND ONLY table with iceberg engine. All data will be written to iceberg table directly.
73    is_dummy_table: bool,
74}
75
76fn get_op_consistency_level(
77    conflict_behavior: ConflictBehavior,
78    may_have_downstream: bool,
79    depended_subscriptions: &HashSet<u32>,
80) -> StateTableOpConsistencyLevel {
81    if !depended_subscriptions.is_empty() {
82        StateTableOpConsistencyLevel::LogStoreEnabled
83    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
84        // Table with overwrite conflict behavior could disable conflict check
85        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
86        StateTableOpConsistencyLevel::Inconsistent
87    } else {
88        StateTableOpConsistencyLevel::ConsistentOldValue
89    }
90}
91
92impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
93    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
94    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
95    /// should be `None`.
96    #[allow(clippy::too_many_arguments)]
97    pub async fn new(
98        input: Executor,
99        schema: Schema,
100        store: S,
101        arrange_key: Vec<ColumnOrder>,
102        actor_context: ActorContextRef,
103        vnodes: Option<Arc<Bitmap>>,
104        table_catalog: &Table,
105        watermark_epoch: AtomicU64Ref,
106        conflict_behavior: ConflictBehavior,
107        version_column_index: Option<u32>,
108        metrics: Arc<StreamingMetrics>,
109    ) -> Self {
110        let table_columns: Vec<ColumnDesc> = table_catalog
111            .columns
112            .iter()
113            .map(|col| col.column_desc.as_ref().unwrap().into())
114            .collect();
115
116        let row_serde: BasicSerde = BasicSerde::new(
117            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
118            Arc::from(table_columns.into_boxed_slice()),
119        );
120
121        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
122        let may_have_downstream = actor_context.initial_dispatch_num != 0;
123        let depended_subscription_ids = actor_context
124            .related_subscriptions
125            .get(&TableId::new(table_catalog.id))
126            .cloned()
127            .unwrap_or_default();
128        let op_consistency_level = get_op_consistency_level(
129            conflict_behavior,
130            may_have_downstream,
131            &depended_subscription_ids,
132        );
133        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
134        let state_table = StateTableInner::from_table_catalog_with_consistency_level(
135            table_catalog,
136            store,
137            vnodes,
138            op_consistency_level,
139        )
140        .await;
141
142        let mv_metrics = metrics.new_materialize_metrics(
143            TableId::new(table_catalog.id),
144            actor_context.id,
145            actor_context.fragment_id,
146        );
147
148        let metrics_info =
149            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
150
151        let is_dummy_table =
152            table_catalog.engine == Some(Engine::Iceberg as i32) && table_catalog.append_only;
153
154        Self {
155            input,
156            schema,
157            state_table,
158            arrange_key_indices,
159            actor_context,
160            materialize_cache: MaterializeCache::new(
161                watermark_epoch,
162                metrics_info,
163                row_serde,
164                version_column_index,
165            ),
166            conflict_behavior,
167            version_column_index,
168            is_dummy_table,
169            may_have_downstream,
170            depended_subscription_ids,
171            metrics: mv_metrics,
172        }
173    }
174
175    #[try_stream(ok = Message, error = StreamExecutorError)]
176    async fn execute_inner(mut self) {
177        let mv_table_id = TableId::new(self.state_table.table_id());
178
179        let data_types = self.schema.data_types();
180        let mut input = self.input.execute();
181
182        let barrier = expect_first_barrier(&mut input).await?;
183        let first_epoch = barrier.epoch;
184        // The first barrier message should be propagated.
185        yield Message::Barrier(barrier);
186        self.state_table.init_epoch(first_epoch).await?;
187
188        #[for_await]
189        for msg in input {
190            let msg = msg?;
191            self.materialize_cache.evict();
192
193            let msg = match msg {
194                Message::Watermark(w) => Message::Watermark(w),
195                Message::Chunk(chunk) if self.is_dummy_table => {
196                    self.metrics
197                        .materialize_input_row_count
198                        .inc_by(chunk.cardinality() as u64);
199                    Message::Chunk(chunk)
200                }
201                Message::Chunk(chunk) => {
202                    self.metrics
203                        .materialize_input_row_count
204                        .inc_by(chunk.cardinality() as u64);
205
206                    // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
207                    // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
208                    // and the conflict behavior is overwrite.
209                    let do_not_handle_conflict = !self.state_table.is_consistent_op()
210                        && self.version_column_index.is_none()
211                        && self.conflict_behavior == ConflictBehavior::Overwrite;
212                    match self.conflict_behavior {
213                        checked_conflict_behaviors!() if !do_not_handle_conflict => {
214                            if chunk.cardinality() == 0 {
215                                // empty chunk
216                                continue;
217                            }
218                            let (data_chunk, ops) = chunk.into_parts();
219
220                            if self.state_table.value_indices().is_some() {
221                                // TODO(st1page): when materialize partial columns(), we should
222                                // construct some columns in the pk
223                                panic!(
224                                    "materialize executor with data check can not handle only materialize partial columns"
225                                )
226                            };
227                            let values = data_chunk.serialize();
228
229                            let key_chunk = data_chunk.project(self.state_table.pk_indices());
230
231                            let pks = {
232                                let mut pks = vec![vec![]; data_chunk.capacity()];
233                                key_chunk
234                                    .rows_with_holes()
235                                    .zip_eq_fast(pks.iter_mut())
236                                    .for_each(|(r, vnode_and_pk)| {
237                                        if let Some(r) = r {
238                                            self.state_table.pk_serde().serialize(r, vnode_and_pk);
239                                        }
240                                    });
241                                pks
242                            };
243                            let (_, vis) = key_chunk.into_parts();
244                            let row_ops = ops
245                                .iter()
246                                .zip_eq_debug(pks.into_iter())
247                                .zip_eq_debug(values.into_iter())
248                                .zip_eq_debug(vis.iter())
249                                .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
250                                .collect_vec();
251
252                            let change_buffer = self
253                                .materialize_cache
254                                .handle(
255                                    row_ops,
256                                    &self.state_table,
257                                    self.conflict_behavior,
258                                    &self.metrics,
259                                )
260                                .await?;
261
262                            match generate_output(change_buffer, data_types.clone())? {
263                                Some(output_chunk) => {
264                                    self.state_table.write_chunk(output_chunk.clone());
265                                    self.state_table.try_flush().await?;
266                                    Message::Chunk(output_chunk)
267                                }
268                                None => continue,
269                            }
270                        }
271                        ConflictBehavior::IgnoreConflict => unreachable!(),
272                        ConflictBehavior::NoCheck
273                        | ConflictBehavior::Overwrite
274                        | ConflictBehavior::DoUpdateIfNotNull => {
275                            self.state_table.write_chunk(chunk.clone());
276                            self.state_table.try_flush().await?;
277                            Message::Chunk(chunk)
278                        } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(),
279                    }
280                }
281                Message::Barrier(b) => {
282                    // If a downstream mv depends on the current table, we need to do conflict check again.
283                    if !self.may_have_downstream
284                        && b.has_more_downstream_fragments(self.actor_context.id)
285                    {
286                        self.may_have_downstream = true;
287                    }
288                    Self::may_update_depended_subscriptions(
289                        &mut self.depended_subscription_ids,
290                        &b,
291                        mv_table_id,
292                    );
293                    let op_consistency_level = get_op_consistency_level(
294                        self.conflict_behavior,
295                        self.may_have_downstream,
296                        &self.depended_subscription_ids,
297                    );
298                    let post_commit = self
299                        .state_table
300                        .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
301                        .await?;
302                    if !post_commit.inner().is_consistent_op() {
303                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
304                    }
305
306                    let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
307                    let b_epoch = b.epoch;
308                    yield Message::Barrier(b);
309
310                    // Update the vnode bitmap for the state table if asked.
311                    if let Some((_, cache_may_stale)) =
312                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
313                        && cache_may_stale
314                    {
315                        self.materialize_cache.lru_cache.clear();
316                    }
317
318                    self.metrics
319                        .materialize_current_epoch
320                        .set(b_epoch.curr as i64);
321
322                    continue;
323                }
324            };
325            yield msg;
326        }
327    }
328
329    /// return true when changed
330    fn may_update_depended_subscriptions(
331        depended_subscriptions: &mut HashSet<u32>,
332        barrier: &Barrier,
333        mv_table_id: TableId,
334    ) {
335        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
336            if !depended_subscriptions.insert(subscriber_id) {
337                warn!(
338                    ?depended_subscriptions,
339                    ?mv_table_id,
340                    subscriber_id,
341                    "subscription id already exists"
342                );
343            }
344        }
345
346        if let Some(Mutation::DropSubscriptions {
347            subscriptions_to_drop,
348        }) = barrier.mutation.as_deref()
349        {
350            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
351                if *upstream_mv_table_id == mv_table_id
352                    && !depended_subscriptions.remove(subscriber_id)
353                {
354                    warn!(
355                        ?depended_subscriptions,
356                        ?mv_table_id,
357                        subscriber_id,
358                        "drop non existing subscriber_id id"
359                    );
360                }
361            }
362        }
363    }
364}
365
366impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
367    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
368    #[allow(clippy::too_many_arguments)]
369    pub async fn for_test(
370        input: Executor,
371        store: S,
372        table_id: TableId,
373        keys: Vec<ColumnOrder>,
374        column_ids: Vec<ColumnId>,
375        watermark_epoch: AtomicU64Ref,
376        conflict_behavior: ConflictBehavior,
377    ) -> Self {
378        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
379        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
380        let schema = input.schema().clone();
381        let columns: Vec<ColumnDesc> = column_ids
382            .into_iter()
383            .zip_eq_fast(schema.fields.iter())
384            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
385            .collect_vec();
386
387        let row_serde = BasicSerde::new(
388            Arc::from((0..columns.len()).collect_vec()),
389            Arc::from(columns.clone().into_boxed_slice()),
390        );
391        let state_table = StateTableInner::from_table_catalog(
392            &gen_pbtable(
393                table_id,
394                columns,
395                arrange_order_types,
396                arrange_columns.clone(),
397                0,
398            ),
399            store,
400            None,
401        )
402        .await;
403
404        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
405
406        Self {
407            input,
408            schema,
409            state_table,
410            arrange_key_indices: arrange_columns.clone(),
411            actor_context: ActorContext::for_test(0),
412            materialize_cache: MaterializeCache::new(
413                watermark_epoch,
414                MetricsInfo::for_test(),
415                row_serde,
416                None,
417            ),
418            conflict_behavior,
419            version_column_index: None,
420            is_dummy_table: false,
421            may_have_downstream: true,
422            depended_subscription_ids: HashSet::new(),
423            metrics,
424        }
425    }
426}
427
428/// Construct output `StreamChunk` from given buffer.
429fn generate_output(
430    change_buffer: ChangeBuffer,
431    data_types: Vec<DataType>,
432) -> StreamExecutorResult<Option<StreamChunk>> {
433    // construct output chunk
434    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
435    let mut new_ops: Vec<Op> = vec![];
436    let mut new_rows: Vec<Bytes> = vec![];
437    let row_deserializer = RowDeserializer::new(data_types.clone());
438    for (_, row_op) in change_buffer.into_parts() {
439        match row_op {
440            KeyOp::Insert(value) => {
441                new_ops.push(Op::Insert);
442                new_rows.push(value);
443            }
444            KeyOp::Delete(old_value) => {
445                new_ops.push(Op::Delete);
446                new_rows.push(old_value);
447            }
448            KeyOp::Update((old_value, new_value)) => {
449                // if old_value == new_value, we don't need to emit updates to downstream.
450                if old_value != new_value {
451                    new_ops.push(Op::UpdateDelete);
452                    new_ops.push(Op::UpdateInsert);
453                    new_rows.push(old_value);
454                    new_rows.push(new_value);
455                }
456            }
457        }
458    }
459    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
460
461    for row_bytes in new_rows {
462        let res =
463            data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
464        debug_assert!(res.is_none());
465    }
466
467    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
468        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
469        Ok(Some(new_stream_chunk))
470    } else {
471        Ok(None)
472    }
473}
474
475/// `ChangeBuffer` is a buffer to handle chunk into `KeyOp`.
476/// TODO(rc): merge with `TopNStaging`.
477struct ChangeBuffer {
478    buffer: HashMap<Vec<u8>, KeyOp>,
479}
480
481impl ChangeBuffer {
482    fn new() -> Self {
483        Self {
484            buffer: HashMap::new(),
485        }
486    }
487
488    fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
489        let entry = self.buffer.entry(pk);
490        match entry {
491            Entry::Vacant(e) => {
492                e.insert(KeyOp::Insert(value));
493            }
494            Entry::Occupied(mut e) => {
495                if let KeyOp::Delete(old_value) = e.get_mut() {
496                    let old_val = std::mem::take(old_value);
497                    e.insert(KeyOp::Update((old_val, value)));
498                } else {
499                    unreachable!();
500                }
501            }
502        }
503    }
504
505    fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
506        let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
507        match entry {
508            Entry::Vacant(e) => {
509                e.insert(KeyOp::Delete(old_value));
510            }
511            Entry::Occupied(mut e) => match e.get_mut() {
512                KeyOp::Insert(_) => {
513                    e.remove();
514                }
515                KeyOp::Update((prev, _curr)) => {
516                    let prev = std::mem::take(prev);
517                    e.insert(KeyOp::Delete(prev));
518                }
519                KeyOp::Delete(_) => {
520                    unreachable!();
521                }
522            },
523        }
524    }
525
526    fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
527        let entry = self.buffer.entry(pk);
528        match entry {
529            Entry::Vacant(e) => {
530                e.insert(KeyOp::Update((old_value, new_value)));
531            }
532            Entry::Occupied(mut e) => match e.get_mut() {
533                KeyOp::Insert(_) => {
534                    e.insert(KeyOp::Insert(new_value));
535                }
536                KeyOp::Update((_prev, curr)) => {
537                    *curr = new_value;
538                }
539                KeyOp::Delete(_) => {
540                    unreachable!()
541                }
542            },
543        }
544    }
545
546    fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
547        self.buffer
548    }
549}
550impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
551    fn execute(self: Box<Self>) -> BoxedMessageStream {
552        self.execute_inner().boxed()
553    }
554}
555
556impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
557    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
558        f.debug_struct("MaterializeExecutor")
559            .field("arrange_key_indices", &self.arrange_key_indices)
560            .finish()
561    }
562}
563
564/// A cache for materialize executors.
565struct MaterializeCache<SD> {
566    lru_cache: ManagedLruCache<Vec<u8>, CacheValue>,
567    row_serde: BasicSerde,
568    version_column_index: Option<u32>,
569    _serde: PhantomData<SD>,
570}
571
572type CacheValue = Option<CompactedRow>;
573
574impl<SD: ValueRowSerde> MaterializeCache<SD> {
575    fn new(
576        watermark_sequence: AtomicU64Ref,
577        metrics_info: MetricsInfo,
578        row_serde: BasicSerde,
579        version_column_index: Option<u32>,
580    ) -> Self {
581        let lru_cache: ManagedLruCache<Vec<u8>, CacheValue> =
582            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
583        Self {
584            lru_cache,
585            row_serde,
586            version_column_index,
587            _serde: PhantomData,
588        }
589    }
590
591    async fn handle<S: StateStore>(
592        &mut self,
593        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
594        table: &StateTableInner<S, SD>,
595        conflict_behavior: ConflictBehavior,
596        metrics: &MaterializeMetrics,
597    ) -> StreamExecutorResult<ChangeBuffer> {
598        assert_matches!(conflict_behavior, checked_conflict_behaviors!());
599
600        let key_set: HashSet<Box<[u8]>> = row_ops
601            .iter()
602            .map(|(_, k, _)| k.as_slice().into())
603            .collect();
604
605        // Populate the LRU cache with the keys in input chunk.
606        // For new keys, row values are set to None.
607        self.fetch_keys(
608            key_set.iter().map(|v| v.deref()),
609            table,
610            conflict_behavior,
611            metrics,
612        )
613        .await?;
614
615        let mut change_buffer = ChangeBuffer::new();
616        let row_serde = self.row_serde.clone();
617        let version_column_index = self.version_column_index;
618        for (op, key, row) in row_ops {
619            match op {
620                Op::Insert | Op::UpdateInsert => {
621                    let Some(old_row) = self.get_expected(&key) else {
622                        // not exists before, meaning no conflict, simply insert
623                        change_buffer.insert(key.clone(), row.clone());
624                        self.lru_cache.put(key, Some(CompactedRow { row }));
625                        continue;
626                    };
627
628                    // now conflict happens, handle it according to the specified behavior
629                    match conflict_behavior {
630                        ConflictBehavior::Overwrite => {
631                            let need_overwrite = if let Some(idx) = version_column_index {
632                                let old_row_deserialized =
633                                    row_serde.deserializer.deserialize(old_row.row.clone())?;
634                                let new_row_deserialized =
635                                    row_serde.deserializer.deserialize(row.clone())?;
636
637                                version_is_newer_or_equal(
638                                    old_row_deserialized.index(idx as usize),
639                                    new_row_deserialized.index(idx as usize),
640                                )
641                            } else {
642                                // no version column specified, just overwrite
643                                true
644                            };
645                            if need_overwrite {
646                                change_buffer.update(key.clone(), old_row.row.clone(), row.clone());
647                                self.lru_cache.put(key.clone(), Some(CompactedRow { row }));
648                            };
649                        }
650                        ConflictBehavior::IgnoreConflict => {
651                            // ignore conflict, do nothing
652                        }
653                        ConflictBehavior::DoUpdateIfNotNull => {
654                            // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
655                            // TODO(wcy-fdu): find a way to output the resulting new row directly to the downstream chunk, thus avoiding an additional deserialization step.
656
657                            let old_row_deserialized =
658                                row_serde.deserializer.deserialize(old_row.row.clone())?;
659                            let new_row_deserialized =
660                                row_serde.deserializer.deserialize(row.clone())?;
661                            let need_overwrite = if let Some(idx) = version_column_index {
662                                version_is_newer_or_equal(
663                                    old_row_deserialized.index(idx as usize),
664                                    new_row_deserialized.index(idx as usize),
665                                )
666                            } else {
667                                true
668                            };
669
670                            if need_overwrite {
671                                let mut row_deserialized_vec =
672                                    old_row_deserialized.into_inner().into_vec();
673                                replace_if_not_null(
674                                    &mut row_deserialized_vec,
675                                    new_row_deserialized,
676                                );
677                                let updated_row = OwnedRow::new(row_deserialized_vec);
678                                let updated_row_bytes =
679                                    Bytes::from(row_serde.serializer.serialize(updated_row));
680
681                                change_buffer.update(
682                                    key.clone(),
683                                    old_row.row.clone(),
684                                    updated_row_bytes.clone(),
685                                );
686                                self.lru_cache.put(
687                                    key.clone(),
688                                    Some(CompactedRow {
689                                        row: updated_row_bytes,
690                                    }),
691                                );
692                            }
693                        }
694                        _ => unreachable!(),
695                    };
696                }
697
698                Op::Delete | Op::UpdateDelete => {
699                    match conflict_behavior {
700                        checked_conflict_behaviors!() => {
701                            if let Some(old_row) = self.get_expected(&key) {
702                                change_buffer.delete(key.clone(), old_row.row.clone());
703                                // put a None into the cache to represent deletion
704                                self.lru_cache.put(key, None);
705                            } else {
706                                // delete a non-existent value
707                                // this is allowed in the case of mview conflict, so ignore
708                            };
709                        }
710                        _ => unreachable!(),
711                    };
712                }
713            }
714        }
715        Ok(change_buffer)
716    }
717
718    async fn fetch_keys<'a, S: StateStore>(
719        &mut self,
720        keys: impl Iterator<Item = &'a [u8]>,
721        table: &StateTableInner<S, SD>,
722        conflict_behavior: ConflictBehavior,
723        metrics: &MaterializeMetrics,
724    ) -> StreamExecutorResult<()> {
725        let mut futures = vec![];
726        for key in keys {
727            metrics.materialize_cache_total_count.inc();
728
729            if self.lru_cache.contains(key) {
730                if self.lru_cache.get(key).unwrap().is_some() {
731                    metrics.materialize_data_exist_count.inc();
732                }
733                metrics.materialize_cache_hit_count.inc();
734                continue;
735            }
736            futures.push(async {
737                let key_row = table.pk_serde().deserialize(key).unwrap();
738                let row = table.get_row(key_row).await?.map(CompactedRow::from);
739                StreamExecutorResult::Ok((key.to_vec(), row))
740            });
741        }
742
743        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
744        while let Some(result) = buffered.next().await {
745            let (key, row) = result?;
746            if row.is_some() {
747                metrics.materialize_data_exist_count.inc();
748            }
749            // for keys that are not in the table, `value` is None
750            match conflict_behavior {
751                checked_conflict_behaviors!() => self.lru_cache.put(key, row),
752                _ => unreachable!(),
753            };
754        }
755
756        Ok(())
757    }
758
759    fn get_expected(&mut self, key: &[u8]) -> &CacheValue {
760        self.lru_cache.get(key).unwrap_or_else(|| {
761            panic!(
762                "the key {:?} has not been fetched in the materialize executor's cache ",
763                key
764            )
765        })
766    }
767
768    fn evict(&mut self) {
769        self.lru_cache.evict()
770    }
771}
772
773/// Replace columns in an existing row with the corresponding columns in a replacement row, if the
774/// column value in the replacement row is not null.
775///
776/// # Example
777///
778/// ```ignore
779/// let mut row = vec![Some(1), None, Some(3)];
780/// let replacement = vec![Some(10), Some(20), None];
781/// replace_if_not_null(&mut row, replacement);
782/// ```
783///
784/// After the call, `row` will be `[Some(10), Some(20), Some(3)]`.
785fn replace_if_not_null(row: &mut Vec<Option<ScalarImpl>>, replacement: OwnedRow) {
786    for (old_col, new_col) in row.iter_mut().zip_eq_fast(replacement) {
787        if let Some(new_value) = new_col {
788            *old_col = Some(new_value);
789        }
790    }
791}
792
793/// Determines whether pk conflict handling should update an existing row with newly-received value,
794/// according to the value of version column of the new and old rows.
795fn version_is_newer_or_equal(
796    old_version: &Option<ScalarImpl>,
797    new_version: &Option<ScalarImpl>,
798) -> bool {
799    cmp_datum(old_version, new_version, OrderType::ascending_nulls_first()).is_le()
800}
801
802#[cfg(test)]
803mod tests {
804
805    use std::iter;
806    use std::sync::atomic::AtomicU64;
807
808    use rand::rngs::SmallRng;
809    use rand::{Rng, RngCore, SeedableRng};
810    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
811    use risingwave_common::catalog::Field;
812    use risingwave_common::util::epoch::test_epoch;
813    use risingwave_common::util::sort_util::OrderType;
814    use risingwave_hummock_sdk::HummockReadEpoch;
815    use risingwave_storage::memory::MemoryStateStore;
816    use risingwave_storage::table::batch_table::BatchTable;
817
818    use super::*;
819    use crate::executor::test_utils::*;
820
821    #[tokio::test]
822    async fn test_materialize_executor() {
823        // Prepare storage and memtable.
824        let memory_state_store = MemoryStateStore::new();
825        let table_id = TableId::new(1);
826        // Two columns of int32 type, the first column is PK.
827        let schema = Schema::new(vec![
828            Field::unnamed(DataType::Int32),
829            Field::unnamed(DataType::Int32),
830        ]);
831        let column_ids = vec![0.into(), 1.into()];
832
833        // Prepare source chunks.
834        let chunk1 = StreamChunk::from_pretty(
835            " i i
836            + 1 4
837            + 2 5
838            + 3 6",
839        );
840        let chunk2 = StreamChunk::from_pretty(
841            " i i
842            + 7 8
843            - 3 6",
844        );
845
846        // Prepare stream executors.
847        let source = MockSource::with_messages(vec![
848            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
849            Message::Chunk(chunk1),
850            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
851            Message::Chunk(chunk2),
852            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
853        ])
854        .into_executor(schema.clone(), PkIndices::new());
855
856        let order_types = vec![OrderType::ascending()];
857        let column_descs = vec![
858            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
859            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
860        ];
861
862        let table = BatchTable::for_test(
863            memory_state_store.clone(),
864            table_id,
865            column_descs,
866            order_types,
867            vec![0],
868            vec![0, 1],
869        );
870
871        let mut materialize_executor = MaterializeExecutor::for_test(
872            source,
873            memory_state_store,
874            table_id,
875            vec![ColumnOrder::new(0, OrderType::ascending())],
876            column_ids,
877            Arc::new(AtomicU64::new(0)),
878            ConflictBehavior::NoCheck,
879        )
880        .await
881        .boxed()
882        .execute();
883        materialize_executor.next().await.transpose().unwrap();
884
885        materialize_executor.next().await.transpose().unwrap();
886
887        // First stream chunk. We check the existence of (3) -> (3,6)
888        match materialize_executor.next().await.transpose().unwrap() {
889            Some(Message::Barrier(_)) => {
890                let row = table
891                    .get_row(
892                        &OwnedRow::new(vec![Some(3_i32.into())]),
893                        HummockReadEpoch::NoWait(u64::MAX),
894                    )
895                    .await
896                    .unwrap();
897                assert_eq!(
898                    row,
899                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
900                );
901            }
902            _ => unreachable!(),
903        }
904        materialize_executor.next().await.transpose().unwrap();
905        // Second stream chunk. We check the existence of (7) -> (7,8)
906        match materialize_executor.next().await.transpose().unwrap() {
907            Some(Message::Barrier(_)) => {
908                let row = table
909                    .get_row(
910                        &OwnedRow::new(vec![Some(7_i32.into())]),
911                        HummockReadEpoch::NoWait(u64::MAX),
912                    )
913                    .await
914                    .unwrap();
915                assert_eq!(
916                    row,
917                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
918                );
919            }
920            _ => unreachable!(),
921        }
922    }
923
924    // https://github.com/risingwavelabs/risingwave/issues/13346
925    #[tokio::test]
926    async fn test_upsert_stream() {
927        // Prepare storage and memtable.
928        let memory_state_store = MemoryStateStore::new();
929        let table_id = TableId::new(1);
930        // Two columns of int32 type, the first column is PK.
931        let schema = Schema::new(vec![
932            Field::unnamed(DataType::Int32),
933            Field::unnamed(DataType::Int32),
934        ]);
935        let column_ids = vec![0.into(), 1.into()];
936
937        // test double insert one pk, the latter needs to override the former.
938        let chunk1 = StreamChunk::from_pretty(
939            " i i
940            + 1 1",
941        );
942
943        let chunk2 = StreamChunk::from_pretty(
944            " i i
945            + 1 2
946            - 1 2",
947        );
948
949        // Prepare stream executors.
950        let source = MockSource::with_messages(vec![
951            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
952            Message::Chunk(chunk1),
953            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
954            Message::Chunk(chunk2),
955            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
956        ])
957        .into_executor(schema.clone(), PkIndices::new());
958
959        let order_types = vec![OrderType::ascending()];
960        let column_descs = vec![
961            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
962            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
963        ];
964
965        let table = BatchTable::for_test(
966            memory_state_store.clone(),
967            table_id,
968            column_descs,
969            order_types,
970            vec![0],
971            vec![0, 1],
972        );
973
974        let mut materialize_executor = MaterializeExecutor::for_test(
975            source,
976            memory_state_store,
977            table_id,
978            vec![ColumnOrder::new(0, OrderType::ascending())],
979            column_ids,
980            Arc::new(AtomicU64::new(0)),
981            ConflictBehavior::Overwrite,
982        )
983        .await
984        .boxed()
985        .execute();
986        materialize_executor.next().await.transpose().unwrap();
987
988        materialize_executor.next().await.transpose().unwrap();
989        materialize_executor.next().await.transpose().unwrap();
990        materialize_executor.next().await.transpose().unwrap();
991
992        match materialize_executor.next().await.transpose().unwrap() {
993            Some(Message::Barrier(_)) => {
994                let row = table
995                    .get_row(
996                        &OwnedRow::new(vec![Some(1_i32.into())]),
997                        HummockReadEpoch::NoWait(u64::MAX),
998                    )
999                    .await
1000                    .unwrap();
1001                assert!(row.is_none());
1002            }
1003            _ => unreachable!(),
1004        }
1005    }
1006
1007    #[tokio::test]
1008    async fn test_check_insert_conflict() {
1009        // Prepare storage and memtable.
1010        let memory_state_store = MemoryStateStore::new();
1011        let table_id = TableId::new(1);
1012        // Two columns of int32 type, the first column is PK.
1013        let schema = Schema::new(vec![
1014            Field::unnamed(DataType::Int32),
1015            Field::unnamed(DataType::Int32),
1016        ]);
1017        let column_ids = vec![0.into(), 1.into()];
1018
1019        // test double insert one pk, the latter needs to override the former.
1020        let chunk1 = StreamChunk::from_pretty(
1021            " i i
1022            + 1 3
1023            + 1 4
1024            + 2 5
1025            + 3 6",
1026        );
1027
1028        let chunk2 = StreamChunk::from_pretty(
1029            " i i
1030            + 1 3
1031            + 2 6",
1032        );
1033
1034        // test delete wrong value, delete inexistent pk
1035        let chunk3 = StreamChunk::from_pretty(
1036            " i i
1037            + 1 4",
1038        );
1039
1040        // Prepare stream executors.
1041        let source = MockSource::with_messages(vec![
1042            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1043            Message::Chunk(chunk1),
1044            Message::Chunk(chunk2),
1045            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1046            Message::Chunk(chunk3),
1047            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1048        ])
1049        .into_executor(schema.clone(), PkIndices::new());
1050
1051        let order_types = vec![OrderType::ascending()];
1052        let column_descs = vec![
1053            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1054            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1055        ];
1056
1057        let table = BatchTable::for_test(
1058            memory_state_store.clone(),
1059            table_id,
1060            column_descs,
1061            order_types,
1062            vec![0],
1063            vec![0, 1],
1064        );
1065
1066        let mut materialize_executor = MaterializeExecutor::for_test(
1067            source,
1068            memory_state_store,
1069            table_id,
1070            vec![ColumnOrder::new(0, OrderType::ascending())],
1071            column_ids,
1072            Arc::new(AtomicU64::new(0)),
1073            ConflictBehavior::Overwrite,
1074        )
1075        .await
1076        .boxed()
1077        .execute();
1078        materialize_executor.next().await.transpose().unwrap();
1079
1080        materialize_executor.next().await.transpose().unwrap();
1081        materialize_executor.next().await.transpose().unwrap();
1082
1083        // First stream chunk. We check the existence of (3) -> (3,6)
1084        match materialize_executor.next().await.transpose().unwrap() {
1085            Some(Message::Barrier(_)) => {
1086                let row = table
1087                    .get_row(
1088                        &OwnedRow::new(vec![Some(3_i32.into())]),
1089                        HummockReadEpoch::NoWait(u64::MAX),
1090                    )
1091                    .await
1092                    .unwrap();
1093                assert_eq!(
1094                    row,
1095                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1096                );
1097
1098                let row = table
1099                    .get_row(
1100                        &OwnedRow::new(vec![Some(1_i32.into())]),
1101                        HummockReadEpoch::NoWait(u64::MAX),
1102                    )
1103                    .await
1104                    .unwrap();
1105                assert_eq!(
1106                    row,
1107                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1108                );
1109
1110                let row = table
1111                    .get_row(
1112                        &OwnedRow::new(vec![Some(2_i32.into())]),
1113                        HummockReadEpoch::NoWait(u64::MAX),
1114                    )
1115                    .await
1116                    .unwrap();
1117                assert_eq!(
1118                    row,
1119                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1120                );
1121            }
1122            _ => unreachable!(),
1123        }
1124    }
1125
1126    #[tokio::test]
1127    async fn test_delete_and_update_conflict() {
1128        // Prepare storage and memtable.
1129        let memory_state_store = MemoryStateStore::new();
1130        let table_id = TableId::new(1);
1131        // Two columns of int32 type, the first column is PK.
1132        let schema = Schema::new(vec![
1133            Field::unnamed(DataType::Int32),
1134            Field::unnamed(DataType::Int32),
1135        ]);
1136        let column_ids = vec![0.into(), 1.into()];
1137
1138        // test double insert one pk, the latter needs to override the former.
1139        let chunk1 = StreamChunk::from_pretty(
1140            " i i
1141            + 1 4
1142            + 2 5
1143            + 3 6
1144            U- 8 1
1145            U+ 8 2
1146            + 8 3",
1147        );
1148
1149        // test delete wrong value, delete inexistent pk
1150        let chunk2 = StreamChunk::from_pretty(
1151            " i i
1152            + 7 8
1153            - 3 4
1154            - 5 0",
1155        );
1156
1157        // test delete wrong value, delete inexistent pk
1158        let chunk3 = StreamChunk::from_pretty(
1159            " i i
1160            + 1 5
1161            U- 2 4
1162            U+ 2 8
1163            U- 9 0
1164            U+ 9 1",
1165        );
1166
1167        // Prepare stream executors.
1168        let source = MockSource::with_messages(vec![
1169            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1170            Message::Chunk(chunk1),
1171            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1172            Message::Chunk(chunk2),
1173            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1174            Message::Chunk(chunk3),
1175            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1176        ])
1177        .into_executor(schema.clone(), PkIndices::new());
1178
1179        let order_types = vec![OrderType::ascending()];
1180        let column_descs = vec![
1181            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1182            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1183        ];
1184
1185        let table = BatchTable::for_test(
1186            memory_state_store.clone(),
1187            table_id,
1188            column_descs,
1189            order_types,
1190            vec![0],
1191            vec![0, 1],
1192        );
1193
1194        let mut materialize_executor = MaterializeExecutor::for_test(
1195            source,
1196            memory_state_store,
1197            table_id,
1198            vec![ColumnOrder::new(0, OrderType::ascending())],
1199            column_ids,
1200            Arc::new(AtomicU64::new(0)),
1201            ConflictBehavior::Overwrite,
1202        )
1203        .await
1204        .boxed()
1205        .execute();
1206        materialize_executor.next().await.transpose().unwrap();
1207
1208        materialize_executor.next().await.transpose().unwrap();
1209
1210        // First stream chunk. We check the existence of (3) -> (3,6)
1211        match materialize_executor.next().await.transpose().unwrap() {
1212            Some(Message::Barrier(_)) => {
1213                // can read (8, 3), check insert after update
1214                let row = table
1215                    .get_row(
1216                        &OwnedRow::new(vec![Some(8_i32.into())]),
1217                        HummockReadEpoch::NoWait(u64::MAX),
1218                    )
1219                    .await
1220                    .unwrap();
1221                assert_eq!(
1222                    row,
1223                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1224                );
1225            }
1226            _ => unreachable!(),
1227        }
1228        materialize_executor.next().await.transpose().unwrap();
1229
1230        match materialize_executor.next().await.transpose().unwrap() {
1231            Some(Message::Barrier(_)) => {
1232                let row = table
1233                    .get_row(
1234                        &OwnedRow::new(vec![Some(7_i32.into())]),
1235                        HummockReadEpoch::NoWait(u64::MAX),
1236                    )
1237                    .await
1238                    .unwrap();
1239                assert_eq!(
1240                    row,
1241                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1242                );
1243
1244                // check delete wrong value
1245                let row = table
1246                    .get_row(
1247                        &OwnedRow::new(vec![Some(3_i32.into())]),
1248                        HummockReadEpoch::NoWait(u64::MAX),
1249                    )
1250                    .await
1251                    .unwrap();
1252                assert_eq!(row, None);
1253
1254                // check delete wrong pk
1255                let row = table
1256                    .get_row(
1257                        &OwnedRow::new(vec![Some(5_i32.into())]),
1258                        HummockReadEpoch::NoWait(u64::MAX),
1259                    )
1260                    .await
1261                    .unwrap();
1262                assert_eq!(row, None);
1263            }
1264            _ => unreachable!(),
1265        }
1266
1267        materialize_executor.next().await.transpose().unwrap();
1268        // Second stream chunk. We check the existence of (7) -> (7,8)
1269        match materialize_executor.next().await.transpose().unwrap() {
1270            Some(Message::Barrier(_)) => {
1271                let row = table
1272                    .get_row(
1273                        &OwnedRow::new(vec![Some(1_i32.into())]),
1274                        HummockReadEpoch::NoWait(u64::MAX),
1275                    )
1276                    .await
1277                    .unwrap();
1278                assert_eq!(
1279                    row,
1280                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1281                );
1282
1283                // check update wrong value
1284                let row = table
1285                    .get_row(
1286                        &OwnedRow::new(vec![Some(2_i32.into())]),
1287                        HummockReadEpoch::NoWait(u64::MAX),
1288                    )
1289                    .await
1290                    .unwrap();
1291                assert_eq!(
1292                    row,
1293                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1294                );
1295
1296                // check update wrong pk, should become insert
1297                let row = table
1298                    .get_row(
1299                        &OwnedRow::new(vec![Some(9_i32.into())]),
1300                        HummockReadEpoch::NoWait(u64::MAX),
1301                    )
1302                    .await
1303                    .unwrap();
1304                assert_eq!(
1305                    row,
1306                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1307                );
1308            }
1309            _ => unreachable!(),
1310        }
1311    }
1312
1313    #[tokio::test]
1314    async fn test_ignore_insert_conflict() {
1315        // Prepare storage and memtable.
1316        let memory_state_store = MemoryStateStore::new();
1317        let table_id = TableId::new(1);
1318        // Two columns of int32 type, the first column is PK.
1319        let schema = Schema::new(vec![
1320            Field::unnamed(DataType::Int32),
1321            Field::unnamed(DataType::Int32),
1322        ]);
1323        let column_ids = vec![0.into(), 1.into()];
1324
1325        // test double insert one pk, the latter needs to be ignored.
1326        let chunk1 = StreamChunk::from_pretty(
1327            " i i
1328            + 1 3
1329            + 1 4
1330            + 2 5
1331            + 3 6",
1332        );
1333
1334        let chunk2 = StreamChunk::from_pretty(
1335            " i i
1336            + 1 5
1337            + 2 6",
1338        );
1339
1340        // test delete wrong value, delete inexistent pk
1341        let chunk3 = StreamChunk::from_pretty(
1342            " i i
1343            + 1 6",
1344        );
1345
1346        // Prepare stream executors.
1347        let source = MockSource::with_messages(vec![
1348            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1349            Message::Chunk(chunk1),
1350            Message::Chunk(chunk2),
1351            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1352            Message::Chunk(chunk3),
1353            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1354        ])
1355        .into_executor(schema.clone(), PkIndices::new());
1356
1357        let order_types = vec![OrderType::ascending()];
1358        let column_descs = vec![
1359            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1360            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1361        ];
1362
1363        let table = BatchTable::for_test(
1364            memory_state_store.clone(),
1365            table_id,
1366            column_descs,
1367            order_types,
1368            vec![0],
1369            vec![0, 1],
1370        );
1371
1372        let mut materialize_executor = MaterializeExecutor::for_test(
1373            source,
1374            memory_state_store,
1375            table_id,
1376            vec![ColumnOrder::new(0, OrderType::ascending())],
1377            column_ids,
1378            Arc::new(AtomicU64::new(0)),
1379            ConflictBehavior::IgnoreConflict,
1380        )
1381        .await
1382        .boxed()
1383        .execute();
1384        materialize_executor.next().await.transpose().unwrap();
1385
1386        materialize_executor.next().await.transpose().unwrap();
1387        materialize_executor.next().await.transpose().unwrap();
1388
1389        // First stream chunk. We check the existence of (3) -> (3,6)
1390        match materialize_executor.next().await.transpose().unwrap() {
1391            Some(Message::Barrier(_)) => {
1392                let row = table
1393                    .get_row(
1394                        &OwnedRow::new(vec![Some(3_i32.into())]),
1395                        HummockReadEpoch::NoWait(u64::MAX),
1396                    )
1397                    .await
1398                    .unwrap();
1399                assert_eq!(
1400                    row,
1401                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1402                );
1403
1404                let row = table
1405                    .get_row(
1406                        &OwnedRow::new(vec![Some(1_i32.into())]),
1407                        HummockReadEpoch::NoWait(u64::MAX),
1408                    )
1409                    .await
1410                    .unwrap();
1411                assert_eq!(
1412                    row,
1413                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1414                );
1415
1416                let row = table
1417                    .get_row(
1418                        &OwnedRow::new(vec![Some(2_i32.into())]),
1419                        HummockReadEpoch::NoWait(u64::MAX),
1420                    )
1421                    .await
1422                    .unwrap();
1423                assert_eq!(
1424                    row,
1425                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1426                );
1427            }
1428            _ => unreachable!(),
1429        }
1430    }
1431
1432    #[tokio::test]
1433    async fn test_ignore_delete_then_insert() {
1434        // Prepare storage and memtable.
1435        let memory_state_store = MemoryStateStore::new();
1436        let table_id = TableId::new(1);
1437        // Two columns of int32 type, the first column is PK.
1438        let schema = Schema::new(vec![
1439            Field::unnamed(DataType::Int32),
1440            Field::unnamed(DataType::Int32),
1441        ]);
1442        let column_ids = vec![0.into(), 1.into()];
1443
1444        // test insert after delete one pk, the latter insert should succeed.
1445        let chunk1 = StreamChunk::from_pretty(
1446            " i i
1447            + 1 3
1448            - 1 3
1449            + 1 6",
1450        );
1451
1452        // Prepare stream executors.
1453        let source = MockSource::with_messages(vec![
1454            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1455            Message::Chunk(chunk1),
1456            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1457        ])
1458        .into_executor(schema.clone(), PkIndices::new());
1459
1460        let order_types = vec![OrderType::ascending()];
1461        let column_descs = vec![
1462            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1463            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1464        ];
1465
1466        let table = BatchTable::for_test(
1467            memory_state_store.clone(),
1468            table_id,
1469            column_descs,
1470            order_types,
1471            vec![0],
1472            vec![0, 1],
1473        );
1474
1475        let mut materialize_executor = MaterializeExecutor::for_test(
1476            source,
1477            memory_state_store,
1478            table_id,
1479            vec![ColumnOrder::new(0, OrderType::ascending())],
1480            column_ids,
1481            Arc::new(AtomicU64::new(0)),
1482            ConflictBehavior::IgnoreConflict,
1483        )
1484        .await
1485        .boxed()
1486        .execute();
1487        let _msg1 = materialize_executor
1488            .next()
1489            .await
1490            .transpose()
1491            .unwrap()
1492            .unwrap()
1493            .as_barrier()
1494            .unwrap();
1495        let _msg2 = materialize_executor
1496            .next()
1497            .await
1498            .transpose()
1499            .unwrap()
1500            .unwrap()
1501            .as_chunk()
1502            .unwrap();
1503        let _msg3 = materialize_executor
1504            .next()
1505            .await
1506            .transpose()
1507            .unwrap()
1508            .unwrap()
1509            .as_barrier()
1510            .unwrap();
1511
1512        let row = table
1513            .get_row(
1514                &OwnedRow::new(vec![Some(1_i32.into())]),
1515                HummockReadEpoch::NoWait(u64::MAX),
1516            )
1517            .await
1518            .unwrap();
1519        assert_eq!(
1520            row,
1521            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1522        );
1523    }
1524
1525    #[tokio::test]
1526    async fn test_ignore_delete_and_update_conflict() {
1527        // Prepare storage and memtable.
1528        let memory_state_store = MemoryStateStore::new();
1529        let table_id = TableId::new(1);
1530        // Two columns of int32 type, the first column is PK.
1531        let schema = Schema::new(vec![
1532            Field::unnamed(DataType::Int32),
1533            Field::unnamed(DataType::Int32),
1534        ]);
1535        let column_ids = vec![0.into(), 1.into()];
1536
1537        // test double insert one pk, the latter should be ignored.
1538        let chunk1 = StreamChunk::from_pretty(
1539            " i i
1540            + 1 4
1541            + 2 5
1542            + 3 6
1543            U- 8 1
1544            U+ 8 2
1545            + 8 3",
1546        );
1547
1548        // test delete wrong value, delete inexistent pk
1549        let chunk2 = StreamChunk::from_pretty(
1550            " i i
1551            + 7 8
1552            - 3 4
1553            - 5 0",
1554        );
1555
1556        // test delete wrong value, delete inexistent pk
1557        let chunk3 = StreamChunk::from_pretty(
1558            " i i
1559            + 1 5
1560            U- 2 4
1561            U+ 2 8
1562            U- 9 0
1563            U+ 9 1",
1564        );
1565
1566        // Prepare stream executors.
1567        let source = MockSource::with_messages(vec![
1568            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1569            Message::Chunk(chunk1),
1570            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1571            Message::Chunk(chunk2),
1572            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1573            Message::Chunk(chunk3),
1574            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1575        ])
1576        .into_executor(schema.clone(), PkIndices::new());
1577
1578        let order_types = vec![OrderType::ascending()];
1579        let column_descs = vec![
1580            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1581            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1582        ];
1583
1584        let table = BatchTable::for_test(
1585            memory_state_store.clone(),
1586            table_id,
1587            column_descs,
1588            order_types,
1589            vec![0],
1590            vec![0, 1],
1591        );
1592
1593        let mut materialize_executor = MaterializeExecutor::for_test(
1594            source,
1595            memory_state_store,
1596            table_id,
1597            vec![ColumnOrder::new(0, OrderType::ascending())],
1598            column_ids,
1599            Arc::new(AtomicU64::new(0)),
1600            ConflictBehavior::IgnoreConflict,
1601        )
1602        .await
1603        .boxed()
1604        .execute();
1605        materialize_executor.next().await.transpose().unwrap();
1606
1607        materialize_executor.next().await.transpose().unwrap();
1608
1609        // First stream chunk. We check the existence of (3) -> (3,6)
1610        match materialize_executor.next().await.transpose().unwrap() {
1611            Some(Message::Barrier(_)) => {
1612                // can read (8, 2), check insert after update
1613                let row = table
1614                    .get_row(
1615                        &OwnedRow::new(vec![Some(8_i32.into())]),
1616                        HummockReadEpoch::NoWait(u64::MAX),
1617                    )
1618                    .await
1619                    .unwrap();
1620                assert_eq!(
1621                    row,
1622                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1623                );
1624            }
1625            _ => unreachable!(),
1626        }
1627        materialize_executor.next().await.transpose().unwrap();
1628
1629        match materialize_executor.next().await.transpose().unwrap() {
1630            Some(Message::Barrier(_)) => {
1631                let row = table
1632                    .get_row(
1633                        &OwnedRow::new(vec![Some(7_i32.into())]),
1634                        HummockReadEpoch::NoWait(u64::MAX),
1635                    )
1636                    .await
1637                    .unwrap();
1638                assert_eq!(
1639                    row,
1640                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1641                );
1642
1643                // check delete wrong value
1644                let row = table
1645                    .get_row(
1646                        &OwnedRow::new(vec![Some(3_i32.into())]),
1647                        HummockReadEpoch::NoWait(u64::MAX),
1648                    )
1649                    .await
1650                    .unwrap();
1651                assert_eq!(row, None);
1652
1653                // check delete wrong pk
1654                let row = table
1655                    .get_row(
1656                        &OwnedRow::new(vec![Some(5_i32.into())]),
1657                        HummockReadEpoch::NoWait(u64::MAX),
1658                    )
1659                    .await
1660                    .unwrap();
1661                assert_eq!(row, None);
1662            }
1663            _ => unreachable!(),
1664        }
1665
1666        materialize_executor.next().await.transpose().unwrap();
1667        // materialize_executor.next().await.transpose().unwrap();
1668        // Second stream chunk. We check the existence of (7) -> (7,8)
1669        match materialize_executor.next().await.transpose().unwrap() {
1670            Some(Message::Barrier(_)) => {
1671                let row = table
1672                    .get_row(
1673                        &OwnedRow::new(vec![Some(1_i32.into())]),
1674                        HummockReadEpoch::NoWait(u64::MAX),
1675                    )
1676                    .await
1677                    .unwrap();
1678                assert_eq!(
1679                    row,
1680                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1681                );
1682
1683                // check update wrong value
1684                let row = table
1685                    .get_row(
1686                        &OwnedRow::new(vec![Some(2_i32.into())]),
1687                        HummockReadEpoch::NoWait(u64::MAX),
1688                    )
1689                    .await
1690                    .unwrap();
1691                assert_eq!(
1692                    row,
1693                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1694                );
1695
1696                // check update wrong pk, should become insert
1697                let row = table
1698                    .get_row(
1699                        &OwnedRow::new(vec![Some(9_i32.into())]),
1700                        HummockReadEpoch::NoWait(u64::MAX),
1701                    )
1702                    .await
1703                    .unwrap();
1704                assert_eq!(
1705                    row,
1706                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1707                );
1708            }
1709            _ => unreachable!(),
1710        }
1711    }
1712
1713    #[tokio::test]
1714    async fn test_do_update_if_not_null_conflict() {
1715        // Prepare storage and memtable.
1716        let memory_state_store = MemoryStateStore::new();
1717        let table_id = TableId::new(1);
1718        // Two columns of int32 type, the first column is PK.
1719        let schema = Schema::new(vec![
1720            Field::unnamed(DataType::Int32),
1721            Field::unnamed(DataType::Int32),
1722        ]);
1723        let column_ids = vec![0.into(), 1.into()];
1724
1725        // should get (8, 2)
1726        let chunk1 = StreamChunk::from_pretty(
1727            " i i
1728            + 1 4
1729            + 2 .
1730            + 3 6
1731            U- 8 .
1732            U+ 8 2
1733            + 8 .",
1734        );
1735
1736        // should not get (3, x), should not get (5, 0)
1737        let chunk2 = StreamChunk::from_pretty(
1738            " i i
1739            + 7 8
1740            - 3 4
1741            - 5 0",
1742        );
1743
1744        // should get (2, None), (7, 8)
1745        let chunk3 = StreamChunk::from_pretty(
1746            " i i
1747            + 1 5
1748            + 7 .
1749            U- 2 4
1750            U+ 2 .
1751            U- 9 0
1752            U+ 9 1",
1753        );
1754
1755        // Prepare stream executors.
1756        let source = MockSource::with_messages(vec![
1757            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1758            Message::Chunk(chunk1),
1759            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1760            Message::Chunk(chunk2),
1761            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1762            Message::Chunk(chunk3),
1763            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1764        ])
1765        .into_executor(schema.clone(), PkIndices::new());
1766
1767        let order_types = vec![OrderType::ascending()];
1768        let column_descs = vec![
1769            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1770            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1771        ];
1772
1773        let table = BatchTable::for_test(
1774            memory_state_store.clone(),
1775            table_id,
1776            column_descs,
1777            order_types,
1778            vec![0],
1779            vec![0, 1],
1780        );
1781
1782        let mut materialize_executor = MaterializeExecutor::for_test(
1783            source,
1784            memory_state_store,
1785            table_id,
1786            vec![ColumnOrder::new(0, OrderType::ascending())],
1787            column_ids,
1788            Arc::new(AtomicU64::new(0)),
1789            ConflictBehavior::DoUpdateIfNotNull,
1790        )
1791        .await
1792        .boxed()
1793        .execute();
1794        materialize_executor.next().await.transpose().unwrap();
1795
1796        materialize_executor.next().await.transpose().unwrap();
1797
1798        // First stream chunk. We check the existence of (3) -> (3,6)
1799        match materialize_executor.next().await.transpose().unwrap() {
1800            Some(Message::Barrier(_)) => {
1801                let row = table
1802                    .get_row(
1803                        &OwnedRow::new(vec![Some(8_i32.into())]),
1804                        HummockReadEpoch::NoWait(u64::MAX),
1805                    )
1806                    .await
1807                    .unwrap();
1808                assert_eq!(
1809                    row,
1810                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1811                );
1812
1813                let row = table
1814                    .get_row(
1815                        &OwnedRow::new(vec![Some(2_i32.into())]),
1816                        HummockReadEpoch::NoWait(u64::MAX),
1817                    )
1818                    .await
1819                    .unwrap();
1820                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1821            }
1822            _ => unreachable!(),
1823        }
1824        materialize_executor.next().await.transpose().unwrap();
1825
1826        match materialize_executor.next().await.transpose().unwrap() {
1827            Some(Message::Barrier(_)) => {
1828                let row = table
1829                    .get_row(
1830                        &OwnedRow::new(vec![Some(7_i32.into())]),
1831                        HummockReadEpoch::NoWait(u64::MAX),
1832                    )
1833                    .await
1834                    .unwrap();
1835                assert_eq!(
1836                    row,
1837                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1838                );
1839
1840                // check delete wrong value
1841                let row = table
1842                    .get_row(
1843                        &OwnedRow::new(vec![Some(3_i32.into())]),
1844                        HummockReadEpoch::NoWait(u64::MAX),
1845                    )
1846                    .await
1847                    .unwrap();
1848                assert_eq!(row, None);
1849
1850                // check delete wrong pk
1851                let row = table
1852                    .get_row(
1853                        &OwnedRow::new(vec![Some(5_i32.into())]),
1854                        HummockReadEpoch::NoWait(u64::MAX),
1855                    )
1856                    .await
1857                    .unwrap();
1858                assert_eq!(row, None);
1859            }
1860            _ => unreachable!(),
1861        }
1862
1863        materialize_executor.next().await.transpose().unwrap();
1864        // materialize_executor.next().await.transpose().unwrap();
1865        // Second stream chunk. We check the existence of (7) -> (7,8)
1866        match materialize_executor.next().await.transpose().unwrap() {
1867            Some(Message::Barrier(_)) => {
1868                let row = table
1869                    .get_row(
1870                        &OwnedRow::new(vec![Some(7_i32.into())]),
1871                        HummockReadEpoch::NoWait(u64::MAX),
1872                    )
1873                    .await
1874                    .unwrap();
1875                assert_eq!(
1876                    row,
1877                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1878                );
1879
1880                // check update wrong value
1881                let row = table
1882                    .get_row(
1883                        &OwnedRow::new(vec![Some(2_i32.into())]),
1884                        HummockReadEpoch::NoWait(u64::MAX),
1885                    )
1886                    .await
1887                    .unwrap();
1888                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1889
1890                // check update wrong pk, should become insert
1891                let row = table
1892                    .get_row(
1893                        &OwnedRow::new(vec![Some(9_i32.into())]),
1894                        HummockReadEpoch::NoWait(u64::MAX),
1895                    )
1896                    .await
1897                    .unwrap();
1898                assert_eq!(
1899                    row,
1900                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1901                );
1902            }
1903            _ => unreachable!(),
1904        }
1905    }
1906
1907    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1908        const KN: u32 = 4;
1909        const SEED: u64 = 998244353;
1910        let mut ret = vec![];
1911        let mut builder =
1912            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1913        let mut rng = SmallRng::seed_from_u64(SEED);
1914
1915        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1916            let len = c.data_chunk().capacity();
1917            let mut c = StreamChunkMut::from(c);
1918            for i in 0..len {
1919                c.set_vis(i, rng.random_bool(0.5));
1920            }
1921            c.into()
1922        };
1923        for _ in 0..row_number {
1924            let k = (rng.next_u32() % KN) as i32;
1925            let v = rng.next_u32() as i32;
1926            let op = if rng.random_bool(0.5) {
1927                Op::Insert
1928            } else {
1929                Op::Delete
1930            };
1931            if let Some(c) =
1932                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1933            {
1934                ret.push(random_vis(c, &mut rng));
1935            }
1936        }
1937        if let Some(c) = builder.take() {
1938            ret.push(random_vis(c, &mut rng));
1939        }
1940        ret
1941    }
1942
1943    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1944        const N: usize = 100000;
1945
1946        // Prepare storage and memtable.
1947        let memory_state_store = MemoryStateStore::new();
1948        let table_id = TableId::new(1);
1949        // Two columns of int32 type, the first column is PK.
1950        let schema = Schema::new(vec![
1951            Field::unnamed(DataType::Int32),
1952            Field::unnamed(DataType::Int32),
1953        ]);
1954        let column_ids = vec![0.into(), 1.into()];
1955
1956        let chunks = gen_fuzz_data(N, 128);
1957        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1958            .chain(chunks.into_iter().map(Message::Chunk))
1959            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1960                test_epoch(2),
1961            ))))
1962            .collect();
1963        // Prepare stream executors.
1964        let source =
1965            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
1966
1967        let mut materialize_executor = MaterializeExecutor::for_test(
1968            source,
1969            memory_state_store.clone(),
1970            table_id,
1971            vec![ColumnOrder::new(0, OrderType::ascending())],
1972            column_ids,
1973            Arc::new(AtomicU64::new(0)),
1974            conflict_behavior,
1975        )
1976        .await
1977        .boxed()
1978        .execute();
1979        materialize_executor.expect_barrier().await;
1980
1981        let order_types = vec![OrderType::ascending()];
1982        let column_descs = vec![
1983            ColumnDesc::unnamed(0.into(), DataType::Int32),
1984            ColumnDesc::unnamed(1.into(), DataType::Int32),
1985        ];
1986        let pk_indices = vec![0];
1987
1988        let mut table = StateTable::from_table_catalog(
1989            &gen_pbtable(
1990                TableId::from(1002),
1991                column_descs.clone(),
1992                order_types,
1993                pk_indices,
1994                0,
1995            ),
1996            memory_state_store.clone(),
1997            None,
1998        )
1999        .await;
2000
2001        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2002            // check with state table's memtable
2003            table.write_chunk(c);
2004        }
2005    }
2006
2007    #[tokio::test]
2008    async fn fuzz_test_stream_consistent_upsert() {
2009        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2010    }
2011
2012    #[tokio::test]
2013    async fn fuzz_test_stream_consistent_ignore() {
2014        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2015    }
2016}