risingwave_stream/executor/mview/
materialize.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::Ordering;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::marker::PhantomData;
19use std::ops::{Deref, Index};
20
21use bytes::Bytes;
22use futures::stream;
23use itertools::Itertools;
24use risingwave_common::array::Op;
25use risingwave_common::bitmap::Bitmap;
26use risingwave_common::catalog::{ColumnDesc, ColumnId, ConflictBehavior, TableId};
27use risingwave_common::row::{CompactedRow, RowDeserializer};
28use risingwave_common::types::DefaultOrd;
29use risingwave_common::util::chunk_coalesce::DataChunkBuilder;
30use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
31use risingwave_common::util::sort_util::ColumnOrder;
32use risingwave_common::util::value_encoding::{BasicSerde, ValueRowSerializer};
33use risingwave_pb::catalog::Table;
34use risingwave_storage::mem_table::KeyOp;
35use risingwave_storage::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
36
37use crate::cache::ManagedLruCache;
38use crate::common::metrics::MetricsInfo;
39use crate::common::table::state_table::{StateTableInner, StateTableOpConsistencyLevel};
40use crate::common::table::test_utils::gen_pbtable;
41use crate::executor::monitor::MaterializeMetrics;
42use crate::executor::prelude::*;
43
44/// `MaterializeExecutor` materializes changes in stream into a materialized view on storage.
45pub struct MaterializeExecutor<S: StateStore, SD: ValueRowSerde> {
46    input: Executor,
47
48    schema: Schema,
49
50    state_table: StateTableInner<S, SD>,
51
52    /// Columns of arrange keys (including pk, group keys, join keys, etc.)
53    arrange_key_indices: Vec<usize>,
54
55    actor_context: ActorContextRef,
56
57    materialize_cache: MaterializeCache<SD>,
58
59    conflict_behavior: ConflictBehavior,
60
61    version_column_index: Option<u32>,
62
63    may_have_downstream: bool,
64
65    depended_subscription_ids: HashSet<u32>,
66
67    metrics: MaterializeMetrics,
68}
69
70fn get_op_consistency_level(
71    conflict_behavior: ConflictBehavior,
72    may_have_downstream: bool,
73    depended_subscriptions: &HashSet<u32>,
74) -> StateTableOpConsistencyLevel {
75    if !depended_subscriptions.is_empty() {
76        StateTableOpConsistencyLevel::LogStoreEnabled
77    } else if !may_have_downstream && matches!(conflict_behavior, ConflictBehavior::Overwrite) {
78        // Table with overwrite conflict behavior could disable conflict check
79        // if no downstream mv depends on it, so we use a inconsistent_op to skip sanity check as well.
80        StateTableOpConsistencyLevel::Inconsistent
81    } else {
82        StateTableOpConsistencyLevel::ConsistentOldValue
83    }
84}
85
86impl<S: StateStore, SD: ValueRowSerde> MaterializeExecutor<S, SD> {
87    /// Create a new `MaterializeExecutor` with distribution specified with `distribution_keys` and
88    /// `vnodes`. For singleton distribution, `distribution_keys` should be empty and `vnodes`
89    /// should be `None`.
90    #[allow(clippy::too_many_arguments)]
91    pub async fn new(
92        input: Executor,
93        schema: Schema,
94        store: S,
95        arrange_key: Vec<ColumnOrder>,
96        actor_context: ActorContextRef,
97        vnodes: Option<Arc<Bitmap>>,
98        table_catalog: &Table,
99        watermark_epoch: AtomicU64Ref,
100        conflict_behavior: ConflictBehavior,
101        version_column_index: Option<u32>,
102        metrics: Arc<StreamingMetrics>,
103    ) -> Self {
104        let table_columns: Vec<ColumnDesc> = table_catalog
105            .columns
106            .iter()
107            .map(|col| col.column_desc.as_ref().unwrap().into())
108            .collect();
109
110        let row_serde: BasicSerde = BasicSerde::new(
111            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
112            Arc::from(table_columns.into_boxed_slice()),
113        );
114
115        let arrange_key_indices: Vec<usize> = arrange_key.iter().map(|k| k.column_index).collect();
116        let may_have_downstream = actor_context.initial_dispatch_num != 0;
117        let depended_subscription_ids = actor_context
118            .related_subscriptions
119            .get(&TableId::new(table_catalog.id))
120            .cloned()
121            .unwrap_or_default();
122        let op_consistency_level = get_op_consistency_level(
123            conflict_behavior,
124            may_have_downstream,
125            &depended_subscription_ids,
126        );
127        // Note: The current implementation could potentially trigger a switch on the inconsistent_op flag. If the storage relies on this flag to perform optimizations, it would be advisable to maintain consistency with it throughout the lifecycle.
128        let state_table = StateTableInner::from_table_catalog_with_consistency_level(
129            table_catalog,
130            store,
131            vnodes,
132            op_consistency_level,
133        )
134        .await;
135
136        let mv_metrics = metrics.new_materialize_metrics(
137            TableId::new(table_catalog.id),
138            actor_context.id,
139            actor_context.fragment_id,
140        );
141
142        let metrics_info =
143            MetricsInfo::new(metrics, table_catalog.id, actor_context.id, "Materialize");
144
145        Self {
146            input,
147            schema,
148            state_table,
149            arrange_key_indices,
150            actor_context,
151            materialize_cache: MaterializeCache::new(
152                watermark_epoch,
153                metrics_info,
154                row_serde,
155                version_column_index,
156            ),
157            conflict_behavior,
158            version_column_index,
159            may_have_downstream,
160            depended_subscription_ids,
161            metrics: mv_metrics,
162        }
163    }
164
165    #[try_stream(ok = Message, error = StreamExecutorError)]
166    async fn execute_inner(mut self) {
167        let mv_table_id = TableId::new(self.state_table.table_id());
168
169        let data_types = self.schema.data_types();
170        let mut input = self.input.execute();
171
172        let barrier = expect_first_barrier(&mut input).await?;
173        let first_epoch = barrier.epoch;
174        // The first barrier message should be propagated.
175        yield Message::Barrier(barrier);
176        self.state_table.init_epoch(first_epoch).await?;
177
178        #[for_await]
179        for msg in input {
180            let msg = msg?;
181            self.materialize_cache.evict();
182
183            yield match msg {
184                Message::Watermark(w) => Message::Watermark(w),
185                Message::Chunk(chunk) => {
186                    self.metrics
187                        .materialize_input_row_count
188                        .inc_by(chunk.cardinality() as u64);
189
190                    // This is an optimization that handles conflicts only when a particular materialized view downstream has no MV dependencies.
191                    // This optimization is applied only when there is no specified version column and the is_consistent_op flag of the state table is false,
192                    // and the conflict behavior is overwrite.
193                    let do_not_handle_conflict = !self.state_table.is_consistent_op()
194                        && self.version_column_index.is_none()
195                        && self.conflict_behavior == ConflictBehavior::Overwrite;
196                    match self.conflict_behavior {
197                        ConflictBehavior::Overwrite
198                        | ConflictBehavior::IgnoreConflict
199                        | ConflictBehavior::DoUpdateIfNotNull
200                            if !do_not_handle_conflict =>
201                        {
202                            if chunk.cardinality() == 0 {
203                                // empty chunk
204                                continue;
205                            }
206                            let (data_chunk, ops) = chunk.into_parts();
207
208                            if self.state_table.value_indices().is_some() {
209                                // TODO(st1page): when materialize partial columns(), we should
210                                // construct some columns in the pk
211                                panic!(
212                                    "materialize executor with data check can not handle only materialize partial columns"
213                                )
214                            };
215                            let values = data_chunk.serialize();
216
217                            let key_chunk = data_chunk.project(self.state_table.pk_indices());
218
219                            let pks = {
220                                let mut pks = vec![vec![]; data_chunk.capacity()];
221                                key_chunk
222                                    .rows_with_holes()
223                                    .zip_eq_fast(pks.iter_mut())
224                                    .for_each(|(r, vnode_and_pk)| {
225                                        if let Some(r) = r {
226                                            self.state_table.pk_serde().serialize(r, vnode_and_pk);
227                                        }
228                                    });
229                                pks
230                            };
231                            let (_, vis) = key_chunk.into_parts();
232                            let row_ops = ops
233                                .iter()
234                                .zip_eq_debug(pks.into_iter())
235                                .zip_eq_debug(values.into_iter())
236                                .zip_eq_debug(vis.iter())
237                                .filter_map(|(((op, k), v), vis)| vis.then_some((*op, k, v)))
238                                .collect_vec();
239
240                            let fixed_changes = self
241                                .materialize_cache
242                                .handle(
243                                    row_ops,
244                                    &self.state_table,
245                                    &self.conflict_behavior,
246                                    &self.metrics,
247                                )
248                                .await?;
249
250                            match generate_output(fixed_changes, data_types.clone())? {
251                                Some(output_chunk) => {
252                                    self.state_table.write_chunk(output_chunk.clone());
253                                    self.state_table.try_flush().await?;
254                                    Message::Chunk(output_chunk)
255                                }
256                                None => continue,
257                            }
258                        }
259                        ConflictBehavior::IgnoreConflict => unreachable!(),
260                        ConflictBehavior::NoCheck
261                        | ConflictBehavior::Overwrite
262                        | ConflictBehavior::DoUpdateIfNotNull => {
263                            self.state_table.write_chunk(chunk.clone());
264                            self.state_table.try_flush().await?;
265                            Message::Chunk(chunk)
266                        } // ConflictBehavior::DoUpdateIfNotNull => unimplemented!(),
267                    }
268                }
269                Message::Barrier(b) => {
270                    // If a downstream mv depends on the current table, we need to do conflict check again.
271                    if !self.may_have_downstream
272                        && b.has_more_downstream_fragments(self.actor_context.id)
273                    {
274                        self.may_have_downstream = true;
275                    }
276                    Self::may_update_depended_subscriptions(
277                        &mut self.depended_subscription_ids,
278                        &b,
279                        mv_table_id,
280                    );
281                    let op_consistency_level = get_op_consistency_level(
282                        self.conflict_behavior,
283                        self.may_have_downstream,
284                        &self.depended_subscription_ids,
285                    );
286                    let post_commit = self
287                        .state_table
288                        .commit_may_switch_consistent_op(b.epoch, op_consistency_level)
289                        .await?;
290                    if !post_commit.inner().is_consistent_op() {
291                        assert_eq!(self.conflict_behavior, ConflictBehavior::Overwrite);
292                    }
293
294                    let update_vnode_bitmap = b.as_update_vnode_bitmap(self.actor_context.id);
295                    let b_epoch = b.epoch;
296                    yield Message::Barrier(b);
297
298                    // Update the vnode bitmap for the state table if asked.
299                    if let Some((_, cache_may_stale)) =
300                        post_commit.post_yield_barrier(update_vnode_bitmap).await?
301                    {
302                        if cache_may_stale {
303                            self.materialize_cache.data.clear();
304                        }
305                    }
306
307                    self.metrics
308                        .materialize_current_epoch
309                        .set(b_epoch.curr as i64);
310
311                    continue;
312                }
313            }
314        }
315    }
316
317    /// return true when changed
318    fn may_update_depended_subscriptions(
319        depended_subscriptions: &mut HashSet<u32>,
320        barrier: &Barrier,
321        mv_table_id: TableId,
322    ) {
323        for subscriber_id in barrier.added_subscriber_on_mv_table(mv_table_id) {
324            if !depended_subscriptions.insert(subscriber_id) {
325                warn!(
326                    ?depended_subscriptions,
327                    ?mv_table_id,
328                    subscriber_id,
329                    "subscription id already exists"
330                );
331            }
332        }
333
334        if let Some(Mutation::DropSubscriptions {
335            subscriptions_to_drop,
336        }) = barrier.mutation.as_deref()
337        {
338            for (subscriber_id, upstream_mv_table_id) in subscriptions_to_drop {
339                if *upstream_mv_table_id == mv_table_id
340                    && !depended_subscriptions.remove(subscriber_id)
341                {
342                    warn!(
343                        ?depended_subscriptions,
344                        ?mv_table_id,
345                        subscriber_id,
346                        "drop non existing subscriber_id id"
347                    );
348                }
349            }
350        }
351    }
352}
353
354impl<S: StateStore> MaterializeExecutor<S, BasicSerde> {
355    /// Create a new `MaterializeExecutor` without distribution info for test purpose.
356    #[allow(clippy::too_many_arguments)]
357    pub async fn for_test(
358        input: Executor,
359        store: S,
360        table_id: TableId,
361        keys: Vec<ColumnOrder>,
362        column_ids: Vec<ColumnId>,
363        watermark_epoch: AtomicU64Ref,
364        conflict_behavior: ConflictBehavior,
365    ) -> Self {
366        let arrange_columns: Vec<usize> = keys.iter().map(|k| k.column_index).collect();
367        let arrange_order_types = keys.iter().map(|k| k.order_type).collect();
368        let schema = input.schema().clone();
369        let columns: Vec<ColumnDesc> = column_ids
370            .into_iter()
371            .zip_eq_fast(schema.fields.iter())
372            .map(|(column_id, field)| ColumnDesc::unnamed(column_id, field.data_type()))
373            .collect_vec();
374
375        let row_serde = BasicSerde::new(
376            Arc::from((0..columns.len()).collect_vec()),
377            Arc::from(columns.clone().into_boxed_slice()),
378        );
379        let state_table = StateTableInner::from_table_catalog(
380            &gen_pbtable(
381                table_id,
382                columns,
383                arrange_order_types,
384                arrange_columns.clone(),
385                0,
386            ),
387            store,
388            None,
389        )
390        .await;
391
392        let metrics = StreamingMetrics::unused().new_materialize_metrics(table_id, 1, 2);
393
394        Self {
395            input,
396            schema,
397            state_table,
398            arrange_key_indices: arrange_columns.clone(),
399            actor_context: ActorContext::for_test(0),
400            materialize_cache: MaterializeCache::new(
401                watermark_epoch,
402                MetricsInfo::for_test(),
403                row_serde,
404                None,
405            ),
406            conflict_behavior,
407            version_column_index: None,
408            may_have_downstream: true,
409            depended_subscription_ids: HashSet::new(),
410            metrics,
411        }
412    }
413}
414
415/// Construct output `StreamChunk` from given buffer.
416fn generate_output(
417    changes: MaterializeBuffer,
418    data_types: Vec<DataType>,
419) -> StreamExecutorResult<Option<StreamChunk>> {
420    // construct output chunk
421    // TODO(st1page): when materialize partial columns(), we should construct some columns in the pk
422    let mut new_ops: Vec<Op> = vec![];
423    let mut new_rows: Vec<Bytes> = vec![];
424    let row_deserializer = RowDeserializer::new(data_types.clone());
425    for (_, row_op) in changes.into_parts() {
426        match row_op {
427            KeyOp::Insert(value) => {
428                new_ops.push(Op::Insert);
429                new_rows.push(value);
430            }
431            KeyOp::Delete(old_value) => {
432                new_ops.push(Op::Delete);
433                new_rows.push(old_value);
434            }
435            KeyOp::Update((old_value, new_value)) => {
436                // if old_value == new_value, we don't need to emit updates to downstream.
437                if old_value != new_value {
438                    new_ops.push(Op::UpdateDelete);
439                    new_ops.push(Op::UpdateInsert);
440                    new_rows.push(old_value);
441                    new_rows.push(new_value);
442                }
443            }
444        }
445    }
446    let mut data_chunk_builder = DataChunkBuilder::new(data_types, new_rows.len() + 1);
447
448    for row_bytes in new_rows {
449        let res =
450            data_chunk_builder.append_one_row(row_deserializer.deserialize(row_bytes.as_ref())?);
451        debug_assert!(res.is_none());
452    }
453
454    if let Some(new_data_chunk) = data_chunk_builder.consume_all() {
455        let new_stream_chunk = StreamChunk::new(new_ops, new_data_chunk.columns().to_vec());
456        Ok(Some(new_stream_chunk))
457    } else {
458        Ok(None)
459    }
460}
461
462/// `MaterializeBuffer` is a buffer to handle chunk into `KeyOp`.
463pub struct MaterializeBuffer {
464    buffer: HashMap<Vec<u8>, KeyOp>,
465}
466
467impl MaterializeBuffer {
468    fn new() -> Self {
469        Self {
470            buffer: HashMap::new(),
471        }
472    }
473
474    pub fn insert(&mut self, pk: Vec<u8>, value: Bytes) {
475        let entry = self.buffer.entry(pk);
476        match entry {
477            Entry::Vacant(e) => {
478                e.insert(KeyOp::Insert(value));
479            }
480            Entry::Occupied(mut e) => {
481                if let KeyOp::Delete(old_value) = e.get_mut() {
482                    let old_val = std::mem::take(old_value);
483                    e.insert(KeyOp::Update((old_val, value)));
484                } else {
485                    unreachable!();
486                }
487            }
488        }
489    }
490
491    pub fn delete(&mut self, pk: Vec<u8>, old_value: Bytes) {
492        let entry: Entry<'_, Vec<u8>, KeyOp> = self.buffer.entry(pk);
493        match entry {
494            Entry::Vacant(e) => {
495                e.insert(KeyOp::Delete(old_value));
496            }
497            Entry::Occupied(mut e) => match e.get_mut() {
498                KeyOp::Insert(_) => {
499                    e.remove();
500                }
501                KeyOp::Update((prev, _curr)) => {
502                    let prev = std::mem::take(prev);
503                    e.insert(KeyOp::Delete(prev));
504                }
505                KeyOp::Delete(_) => {
506                    unreachable!();
507                }
508            },
509        }
510    }
511
512    pub fn update(&mut self, pk: Vec<u8>, old_value: Bytes, new_value: Bytes) {
513        let entry = self.buffer.entry(pk);
514        match entry {
515            Entry::Vacant(e) => {
516                e.insert(KeyOp::Update((old_value, new_value)));
517            }
518            Entry::Occupied(mut e) => match e.get_mut() {
519                KeyOp::Insert(_) => {
520                    e.insert(KeyOp::Insert(new_value));
521                }
522                KeyOp::Update((_prev, curr)) => {
523                    *curr = new_value;
524                }
525                KeyOp::Delete(_) => {
526                    unreachable!()
527                }
528            },
529        }
530    }
531
532    pub fn into_parts(self) -> HashMap<Vec<u8>, KeyOp> {
533        self.buffer
534    }
535}
536impl<S: StateStore, SD: ValueRowSerde> Execute for MaterializeExecutor<S, SD> {
537    fn execute(self: Box<Self>) -> BoxedMessageStream {
538        self.execute_inner().boxed()
539    }
540}
541
542impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for MaterializeExecutor<S, SD> {
543    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
544        f.debug_struct("MaterializeExecutor")
545            .field("arrange_key_indices", &self.arrange_key_indices)
546            .finish()
547    }
548}
549
550/// A cache for materialize executors.
551pub struct MaterializeCache<SD> {
552    data: ManagedLruCache<Vec<u8>, CacheValue>,
553    row_serde: BasicSerde,
554    version_column_index: Option<u32>,
555    _serde: PhantomData<SD>,
556}
557
558type CacheValue = Option<CompactedRow>;
559
560impl<SD: ValueRowSerde> MaterializeCache<SD> {
561    pub fn new(
562        watermark_sequence: AtomicU64Ref,
563        metrics_info: MetricsInfo,
564        row_serde: BasicSerde,
565        version_column_index: Option<u32>,
566    ) -> Self {
567        let cache: ManagedLruCache<Vec<u8>, CacheValue> =
568            ManagedLruCache::unbounded(watermark_sequence, metrics_info.clone());
569        Self {
570            data: cache,
571            row_serde,
572            version_column_index,
573            _serde: PhantomData,
574        }
575    }
576
577    pub async fn handle<S: StateStore>(
578        &mut self,
579        row_ops: Vec<(Op, Vec<u8>, Bytes)>,
580        table: &StateTableInner<S, SD>,
581        conflict_behavior: &ConflictBehavior,
582        metrics: &MaterializeMetrics,
583    ) -> StreamExecutorResult<MaterializeBuffer> {
584        let key_set: HashSet<Box<[u8]>> = row_ops
585            .iter()
586            .map(|(_, k, _)| k.as_slice().into())
587            .collect();
588        self.fetch_keys(
589            key_set.iter().map(|v| v.deref()),
590            table,
591            conflict_behavior,
592            metrics,
593        )
594        .await?;
595        let mut fixed_changes = MaterializeBuffer::new();
596        let row_serde = self.row_serde.clone();
597        let version_column_index = self.version_column_index;
598        for (op, key, value) in row_ops {
599            let mut update_cache = false;
600            let fixed_changes = &mut fixed_changes;
601            // When you change the fixed_changes buffer, you must mark `update_cache` to true so the cache and downstream can be consistent.
602            let fixed_changes = || {
603                update_cache = true;
604                fixed_changes
605            };
606            match op {
607                Op::Insert | Op::UpdateInsert => {
608                    match conflict_behavior {
609                        ConflictBehavior::Overwrite => {
610                            match self.force_get(&key) {
611                                Some(old_row) => {
612                                    let mut handle_conflict = true;
613                                    if let Some(idx) = version_column_index {
614                                        let old_row_deserialized = row_serde
615                                            .deserializer
616                                            .deserialize(old_row.row.clone())?;
617                                        let new_row_deserialized =
618                                            row_serde.deserializer.deserialize(value.clone())?;
619
620                                        handle_conflict = should_handle_conflict(
621                                            old_row_deserialized.index(idx as usize),
622                                            new_row_deserialized.index(idx as usize),
623                                        );
624                                    }
625                                    if handle_conflict {
626                                        fixed_changes().update(
627                                            key.clone(),
628                                            old_row.row.clone(),
629                                            value.clone(),
630                                        )
631                                    } else {
632                                        update_cache = false;
633                                    }
634                                }
635                                None => fixed_changes().insert(key.clone(), value.clone()),
636                            };
637                        }
638                        ConflictBehavior::IgnoreConflict => {
639                            match self.force_get(&key) {
640                                Some(_) => (),
641                                None => {
642                                    fixed_changes().insert(key.clone(), value.clone());
643                                }
644                            };
645                        }
646
647                        ConflictBehavior::DoUpdateIfNotNull => {
648                            match self.force_get(&key) {
649                                Some(old_row) => {
650                                    // In this section, we compare the new row and old row column by column and perform `DoUpdateIfNotNull` replacement.
651                                    // todo(wcy-fdu):find a way to output the resulting new row directly to the downstream chunk, thus avoiding an additional deserialization step.
652                                    let mut old_row_deserialized_vec = row_serde
653                                        .deserializer
654                                        .deserialize(old_row.row.clone())?
655                                        .into_inner()
656                                        .into_vec();
657                                    let new_row_deserialized =
658                                        row_serde.deserializer.deserialize(value.clone())?;
659                                    let mut handle_conflict = true;
660                                    if let Some(idx) = version_column_index {
661                                        handle_conflict = should_handle_conflict(
662                                            &old_row_deserialized_vec[idx as usize],
663                                            new_row_deserialized.index(idx as usize),
664                                        );
665                                    }
666                                    if handle_conflict {
667                                        execute_do_update_if_not_null_replacement(
668                                            &mut old_row_deserialized_vec,
669                                            new_row_deserialized,
670                                        );
671                                        let new_row = OwnedRow::new(old_row_deserialized_vec);
672                                        let new_row_bytes =
673                                            Bytes::from(row_serde.serializer.serialize(new_row));
674                                        fixed_changes().update(
675                                            key.clone(),
676                                            old_row.row.clone(),
677                                            new_row_bytes.clone(),
678                                        );
679                                        // Since `DoUpdateIfNotNull` may generate values that differ from both the new row and old row,
680                                        // an update cache needs to be done here.
681                                        self.data.push(
682                                            key.clone(),
683                                            Some(CompactedRow { row: new_row_bytes }),
684                                        );
685                                    }
686                                    update_cache = false;
687                                }
688                                None => {
689                                    fixed_changes().insert(key.clone(), value.clone());
690                                    update_cache = true;
691                                }
692                            };
693                        }
694                        _ => unreachable!(),
695                    };
696
697                    if update_cache {
698                        match conflict_behavior {
699                            ConflictBehavior::Overwrite
700                            | ConflictBehavior::IgnoreConflict
701                            | ConflictBehavior::DoUpdateIfNotNull => {
702                                self.data.push(key, Some(CompactedRow { row: value }));
703                            }
704
705                            _ => unreachable!(),
706                        }
707                    }
708                }
709
710                Op::Delete | Op::UpdateDelete => {
711                    match conflict_behavior {
712                        ConflictBehavior::Overwrite
713                        | ConflictBehavior::IgnoreConflict
714                        | ConflictBehavior::DoUpdateIfNotNull => {
715                            // delete a nonexistent value
716                            if let Some(old_row) = self.force_get(&key) {
717                                fixed_changes().delete(key.clone(), old_row.row.clone());
718                            };
719                        }
720                        _ => unreachable!(),
721                    };
722
723                    if update_cache {
724                        self.data.push(key, None);
725                    }
726                }
727            }
728        }
729        Ok(fixed_changes)
730    }
731
732    async fn fetch_keys<'a, S: StateStore>(
733        &mut self,
734        keys: impl Iterator<Item = &'a [u8]>,
735        table: &StateTableInner<S, SD>,
736        conflict_behavior: &ConflictBehavior,
737        metrics: &MaterializeMetrics,
738    ) -> StreamExecutorResult<()> {
739        let mut futures = vec![];
740        for key in keys {
741            metrics.materialize_cache_total_count.inc();
742
743            if self.data.contains(key) {
744                metrics.materialize_cache_hit_count.inc();
745                continue;
746            }
747            futures.push(async {
748                let key_row = table.pk_serde().deserialize(key).unwrap();
749                let row = table.get_row(key_row).await?.map(CompactedRow::from);
750                StreamExecutorResult::Ok((key.to_vec(), row))
751            });
752        }
753
754        let mut buffered = stream::iter(futures).buffer_unordered(10).fuse();
755        while let Some(result) = buffered.next().await {
756            let (key, value) = result?;
757            match conflict_behavior {
758                ConflictBehavior::Overwrite | ConflictBehavior::DoUpdateIfNotNull => {
759                    self.data.push(key, value)
760                }
761                ConflictBehavior::IgnoreConflict => self.data.push(key, value),
762                _ => unreachable!(),
763            };
764        }
765
766        Ok(())
767    }
768
769    pub fn force_get(&mut self, key: &[u8]) -> &CacheValue {
770        self.data.get(key).unwrap_or_else(|| {
771            panic!(
772                "the key {:?} has not been fetched in the materialize executor's cache ",
773                key
774            )
775        })
776    }
777
778    fn evict(&mut self) {
779        self.data.evict()
780    }
781}
782
783/// Generates a new row with the behavior of "do update if not null" by replacing columns in the old row with non-empty columns in the new row.
784///
785/// # Arguments
786///
787/// * `old_row` - The old row containing the original values.
788/// * `new_row` - The new row containing the replacement values.
789///
790/// # Example
791///
792/// ```no_run
793/// let old_row = vec![Some(1), None, Some(3)];
794/// let mut new_row = vec![Some(10), Some(20), None];
795///
796///
797/// // After the execute_do_update_if_not_null_replacement function call, old_row will be [Some(10), Some(20), Some(3)].
798/// ```
799fn execute_do_update_if_not_null_replacement(
800    old_row: &mut Vec<Option<ScalarImpl>>,
801    new_row: OwnedRow,
802) {
803    for (old_col, new_col) in old_row.iter_mut().zip_eq_fast(new_row) {
804        if let Some(new_value) = new_col {
805            *old_col = Some(new_value);
806        }
807    }
808}
809/// Determines whether pk conflict should be handled based on the version column of the new and old rows.
810///
811/// # Arguments
812///
813/// * `old_version_column`: The version column value of the old row.
814/// * `new_version_column`: The version column value of the new row.
815///
816/// # Returns
817///
818/// A boolean value indicating whether a conflict should be handled.
819///
820/// If both the new row's version column and the old row's version column are Some, the function will return true if the new row's version is greater than or equal to the old row's version.
821/// If the new row's version column is None and the old row's version column is Some, do not handle pk conflict.
822/// If both the new row's version column and the old row's version column are None, should handle pk conflict.
823fn should_handle_conflict(
824    old_version_column: &Option<ScalarImpl>,
825    new_version_column: &Option<ScalarImpl>,
826) -> bool {
827    match (old_version_column, new_version_column) {
828        (None, None) => true,
829        (None, Some(_)) => true,
830        (Some(_), None) => false,
831        (Some(old_version_col), Some(new_version_col)) => !matches!(
832            old_version_col.default_cmp(new_version_col),
833            Ordering::Greater
834        ),
835    }
836}
837
838#[cfg(test)]
839mod tests {
840
841    use std::iter;
842    use std::sync::atomic::AtomicU64;
843
844    use rand::rngs::SmallRng;
845    use rand::{Rng, RngCore, SeedableRng};
846    use risingwave_common::array::stream_chunk::{StreamChunkMut, StreamChunkTestExt};
847    use risingwave_common::catalog::Field;
848    use risingwave_common::util::epoch::test_epoch;
849    use risingwave_common::util::sort_util::OrderType;
850    use risingwave_hummock_sdk::HummockReadEpoch;
851    use risingwave_storage::memory::MemoryStateStore;
852    use risingwave_storage::table::batch_table::BatchTable;
853
854    use super::*;
855    use crate::executor::test_utils::*;
856
857    #[tokio::test]
858    async fn test_materialize_executor() {
859        // Prepare storage and memtable.
860        let memory_state_store = MemoryStateStore::new();
861        let table_id = TableId::new(1);
862        // Two columns of int32 type, the first column is PK.
863        let schema = Schema::new(vec![
864            Field::unnamed(DataType::Int32),
865            Field::unnamed(DataType::Int32),
866        ]);
867        let column_ids = vec![0.into(), 1.into()];
868
869        // Prepare source chunks.
870        let chunk1 = StreamChunk::from_pretty(
871            " i i
872            + 1 4
873            + 2 5
874            + 3 6",
875        );
876        let chunk2 = StreamChunk::from_pretty(
877            " i i
878            + 7 8
879            - 3 6",
880        );
881
882        // Prepare stream executors.
883        let source = MockSource::with_messages(vec![
884            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
885            Message::Chunk(chunk1),
886            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
887            Message::Chunk(chunk2),
888            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
889        ])
890        .into_executor(schema.clone(), PkIndices::new());
891
892        let order_types = vec![OrderType::ascending()];
893        let column_descs = vec![
894            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
895            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
896        ];
897
898        let table = BatchTable::for_test(
899            memory_state_store.clone(),
900            table_id,
901            column_descs,
902            order_types,
903            vec![0],
904            vec![0, 1],
905        );
906
907        let mut materialize_executor = MaterializeExecutor::for_test(
908            source,
909            memory_state_store,
910            table_id,
911            vec![ColumnOrder::new(0, OrderType::ascending())],
912            column_ids,
913            Arc::new(AtomicU64::new(0)),
914            ConflictBehavior::NoCheck,
915        )
916        .await
917        .boxed()
918        .execute();
919        materialize_executor.next().await.transpose().unwrap();
920
921        materialize_executor.next().await.transpose().unwrap();
922
923        // First stream chunk. We check the existence of (3) -> (3,6)
924        match materialize_executor.next().await.transpose().unwrap() {
925            Some(Message::Barrier(_)) => {
926                let row = table
927                    .get_row(
928                        &OwnedRow::new(vec![Some(3_i32.into())]),
929                        HummockReadEpoch::NoWait(u64::MAX),
930                    )
931                    .await
932                    .unwrap();
933                assert_eq!(
934                    row,
935                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
936                );
937            }
938            _ => unreachable!(),
939        }
940        materialize_executor.next().await.transpose().unwrap();
941        // Second stream chunk. We check the existence of (7) -> (7,8)
942        match materialize_executor.next().await.transpose().unwrap() {
943            Some(Message::Barrier(_)) => {
944                let row = table
945                    .get_row(
946                        &OwnedRow::new(vec![Some(7_i32.into())]),
947                        HummockReadEpoch::NoWait(u64::MAX),
948                    )
949                    .await
950                    .unwrap();
951                assert_eq!(
952                    row,
953                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
954                );
955            }
956            _ => unreachable!(),
957        }
958    }
959
960    // https://github.com/risingwavelabs/risingwave/issues/13346
961    #[tokio::test]
962    async fn test_upsert_stream() {
963        // Prepare storage and memtable.
964        let memory_state_store = MemoryStateStore::new();
965        let table_id = TableId::new(1);
966        // Two columns of int32 type, the first column is PK.
967        let schema = Schema::new(vec![
968            Field::unnamed(DataType::Int32),
969            Field::unnamed(DataType::Int32),
970        ]);
971        let column_ids = vec![0.into(), 1.into()];
972
973        // test double insert one pk, the latter needs to override the former.
974        let chunk1 = StreamChunk::from_pretty(
975            " i i
976            + 1 1",
977        );
978
979        let chunk2 = StreamChunk::from_pretty(
980            " i i
981            + 1 2
982            - 1 2",
983        );
984
985        // Prepare stream executors.
986        let source = MockSource::with_messages(vec![
987            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
988            Message::Chunk(chunk1),
989            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
990            Message::Chunk(chunk2),
991            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
992        ])
993        .into_executor(schema.clone(), PkIndices::new());
994
995        let order_types = vec![OrderType::ascending()];
996        let column_descs = vec![
997            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
998            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
999        ];
1000
1001        let table = BatchTable::for_test(
1002            memory_state_store.clone(),
1003            table_id,
1004            column_descs,
1005            order_types,
1006            vec![0],
1007            vec![0, 1],
1008        );
1009
1010        let mut materialize_executor = MaterializeExecutor::for_test(
1011            source,
1012            memory_state_store,
1013            table_id,
1014            vec![ColumnOrder::new(0, OrderType::ascending())],
1015            column_ids,
1016            Arc::new(AtomicU64::new(0)),
1017            ConflictBehavior::Overwrite,
1018        )
1019        .await
1020        .boxed()
1021        .execute();
1022        materialize_executor.next().await.transpose().unwrap();
1023
1024        materialize_executor.next().await.transpose().unwrap();
1025        materialize_executor.next().await.transpose().unwrap();
1026        materialize_executor.next().await.transpose().unwrap();
1027
1028        match materialize_executor.next().await.transpose().unwrap() {
1029            Some(Message::Barrier(_)) => {
1030                let row = table
1031                    .get_row(
1032                        &OwnedRow::new(vec![Some(1_i32.into())]),
1033                        HummockReadEpoch::NoWait(u64::MAX),
1034                    )
1035                    .await
1036                    .unwrap();
1037                assert!(row.is_none());
1038            }
1039            _ => unreachable!(),
1040        }
1041    }
1042
1043    #[tokio::test]
1044    async fn test_check_insert_conflict() {
1045        // Prepare storage and memtable.
1046        let memory_state_store = MemoryStateStore::new();
1047        let table_id = TableId::new(1);
1048        // Two columns of int32 type, the first column is PK.
1049        let schema = Schema::new(vec![
1050            Field::unnamed(DataType::Int32),
1051            Field::unnamed(DataType::Int32),
1052        ]);
1053        let column_ids = vec![0.into(), 1.into()];
1054
1055        // test double insert one pk, the latter needs to override the former.
1056        let chunk1 = StreamChunk::from_pretty(
1057            " i i
1058            + 1 3
1059            + 1 4
1060            + 2 5
1061            + 3 6",
1062        );
1063
1064        let chunk2 = StreamChunk::from_pretty(
1065            " i i
1066            + 1 3
1067            + 2 6",
1068        );
1069
1070        // test delete wrong value, delete inexistent pk
1071        let chunk3 = StreamChunk::from_pretty(
1072            " i i
1073            + 1 4",
1074        );
1075
1076        // Prepare stream executors.
1077        let source = MockSource::with_messages(vec![
1078            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1079            Message::Chunk(chunk1),
1080            Message::Chunk(chunk2),
1081            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1082            Message::Chunk(chunk3),
1083            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1084        ])
1085        .into_executor(schema.clone(), PkIndices::new());
1086
1087        let order_types = vec![OrderType::ascending()];
1088        let column_descs = vec![
1089            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1090            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1091        ];
1092
1093        let table = BatchTable::for_test(
1094            memory_state_store.clone(),
1095            table_id,
1096            column_descs,
1097            order_types,
1098            vec![0],
1099            vec![0, 1],
1100        );
1101
1102        let mut materialize_executor = MaterializeExecutor::for_test(
1103            source,
1104            memory_state_store,
1105            table_id,
1106            vec![ColumnOrder::new(0, OrderType::ascending())],
1107            column_ids,
1108            Arc::new(AtomicU64::new(0)),
1109            ConflictBehavior::Overwrite,
1110        )
1111        .await
1112        .boxed()
1113        .execute();
1114        materialize_executor.next().await.transpose().unwrap();
1115
1116        materialize_executor.next().await.transpose().unwrap();
1117        materialize_executor.next().await.transpose().unwrap();
1118
1119        // First stream chunk. We check the existence of (3) -> (3,6)
1120        match materialize_executor.next().await.transpose().unwrap() {
1121            Some(Message::Barrier(_)) => {
1122                let row = table
1123                    .get_row(
1124                        &OwnedRow::new(vec![Some(3_i32.into())]),
1125                        HummockReadEpoch::NoWait(u64::MAX),
1126                    )
1127                    .await
1128                    .unwrap();
1129                assert_eq!(
1130                    row,
1131                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1132                );
1133
1134                let row = table
1135                    .get_row(
1136                        &OwnedRow::new(vec![Some(1_i32.into())]),
1137                        HummockReadEpoch::NoWait(u64::MAX),
1138                    )
1139                    .await
1140                    .unwrap();
1141                assert_eq!(
1142                    row,
1143                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1144                );
1145
1146                let row = table
1147                    .get_row(
1148                        &OwnedRow::new(vec![Some(2_i32.into())]),
1149                        HummockReadEpoch::NoWait(u64::MAX),
1150                    )
1151                    .await
1152                    .unwrap();
1153                assert_eq!(
1154                    row,
1155                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(6_i32.into())]))
1156                );
1157            }
1158            _ => unreachable!(),
1159        }
1160    }
1161
1162    #[tokio::test]
1163    async fn test_delete_and_update_conflict() {
1164        // Prepare storage and memtable.
1165        let memory_state_store = MemoryStateStore::new();
1166        let table_id = TableId::new(1);
1167        // Two columns of int32 type, the first column is PK.
1168        let schema = Schema::new(vec![
1169            Field::unnamed(DataType::Int32),
1170            Field::unnamed(DataType::Int32),
1171        ]);
1172        let column_ids = vec![0.into(), 1.into()];
1173
1174        // test double insert one pk, the latter needs to override the former.
1175        let chunk1 = StreamChunk::from_pretty(
1176            " i i
1177            + 1 4
1178            + 2 5
1179            + 3 6
1180            U- 8 1
1181            U+ 8 2
1182            + 8 3",
1183        );
1184
1185        // test delete wrong value, delete inexistent pk
1186        let chunk2 = StreamChunk::from_pretty(
1187            " i i
1188            + 7 8
1189            - 3 4
1190            - 5 0",
1191        );
1192
1193        // test delete wrong value, delete inexistent pk
1194        let chunk3 = StreamChunk::from_pretty(
1195            " i i
1196            + 1 5
1197            U- 2 4
1198            U+ 2 8
1199            U- 9 0
1200            U+ 9 1",
1201        );
1202
1203        // Prepare stream executors.
1204        let source = MockSource::with_messages(vec![
1205            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1206            Message::Chunk(chunk1),
1207            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1208            Message::Chunk(chunk2),
1209            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1210            Message::Chunk(chunk3),
1211            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1212        ])
1213        .into_executor(schema.clone(), PkIndices::new());
1214
1215        let order_types = vec![OrderType::ascending()];
1216        let column_descs = vec![
1217            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1218            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1219        ];
1220
1221        let table = BatchTable::for_test(
1222            memory_state_store.clone(),
1223            table_id,
1224            column_descs,
1225            order_types,
1226            vec![0],
1227            vec![0, 1],
1228        );
1229
1230        let mut materialize_executor = MaterializeExecutor::for_test(
1231            source,
1232            memory_state_store,
1233            table_id,
1234            vec![ColumnOrder::new(0, OrderType::ascending())],
1235            column_ids,
1236            Arc::new(AtomicU64::new(0)),
1237            ConflictBehavior::Overwrite,
1238        )
1239        .await
1240        .boxed()
1241        .execute();
1242        materialize_executor.next().await.transpose().unwrap();
1243
1244        materialize_executor.next().await.transpose().unwrap();
1245
1246        // First stream chunk. We check the existence of (3) -> (3,6)
1247        match materialize_executor.next().await.transpose().unwrap() {
1248            Some(Message::Barrier(_)) => {
1249                // can read (8, 3), check insert after update
1250                let row = table
1251                    .get_row(
1252                        &OwnedRow::new(vec![Some(8_i32.into())]),
1253                        HummockReadEpoch::NoWait(u64::MAX),
1254                    )
1255                    .await
1256                    .unwrap();
1257                assert_eq!(
1258                    row,
1259                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(3_i32.into())]))
1260                );
1261            }
1262            _ => unreachable!(),
1263        }
1264        materialize_executor.next().await.transpose().unwrap();
1265
1266        match materialize_executor.next().await.transpose().unwrap() {
1267            Some(Message::Barrier(_)) => {
1268                let row = table
1269                    .get_row(
1270                        &OwnedRow::new(vec![Some(7_i32.into())]),
1271                        HummockReadEpoch::NoWait(u64::MAX),
1272                    )
1273                    .await
1274                    .unwrap();
1275                assert_eq!(
1276                    row,
1277                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1278                );
1279
1280                // check delete wrong value
1281                let row = table
1282                    .get_row(
1283                        &OwnedRow::new(vec![Some(3_i32.into())]),
1284                        HummockReadEpoch::NoWait(u64::MAX),
1285                    )
1286                    .await
1287                    .unwrap();
1288                assert_eq!(row, None);
1289
1290                // check delete wrong pk
1291                let row = table
1292                    .get_row(
1293                        &OwnedRow::new(vec![Some(5_i32.into())]),
1294                        HummockReadEpoch::NoWait(u64::MAX),
1295                    )
1296                    .await
1297                    .unwrap();
1298                assert_eq!(row, None);
1299            }
1300            _ => unreachable!(),
1301        }
1302
1303        materialize_executor.next().await.transpose().unwrap();
1304        // Second stream chunk. We check the existence of (7) -> (7,8)
1305        match materialize_executor.next().await.transpose().unwrap() {
1306            Some(Message::Barrier(_)) => {
1307                let row = table
1308                    .get_row(
1309                        &OwnedRow::new(vec![Some(1_i32.into())]),
1310                        HummockReadEpoch::NoWait(u64::MAX),
1311                    )
1312                    .await
1313                    .unwrap();
1314                assert_eq!(
1315                    row,
1316                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(5_i32.into())]))
1317                );
1318
1319                // check update wrong value
1320                let row = table
1321                    .get_row(
1322                        &OwnedRow::new(vec![Some(2_i32.into())]),
1323                        HummockReadEpoch::NoWait(u64::MAX),
1324                    )
1325                    .await
1326                    .unwrap();
1327                assert_eq!(
1328                    row,
1329                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1330                );
1331
1332                // check update wrong pk, should become insert
1333                let row = table
1334                    .get_row(
1335                        &OwnedRow::new(vec![Some(9_i32.into())]),
1336                        HummockReadEpoch::NoWait(u64::MAX),
1337                    )
1338                    .await
1339                    .unwrap();
1340                assert_eq!(
1341                    row,
1342                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1343                );
1344            }
1345            _ => unreachable!(),
1346        }
1347    }
1348
1349    #[tokio::test]
1350    async fn test_ignore_insert_conflict() {
1351        // Prepare storage and memtable.
1352        let memory_state_store = MemoryStateStore::new();
1353        let table_id = TableId::new(1);
1354        // Two columns of int32 type, the first column is PK.
1355        let schema = Schema::new(vec![
1356            Field::unnamed(DataType::Int32),
1357            Field::unnamed(DataType::Int32),
1358        ]);
1359        let column_ids = vec![0.into(), 1.into()];
1360
1361        // test double insert one pk, the latter needs to be ignored.
1362        let chunk1 = StreamChunk::from_pretty(
1363            " i i
1364            + 1 3
1365            + 1 4
1366            + 2 5
1367            + 3 6",
1368        );
1369
1370        let chunk2 = StreamChunk::from_pretty(
1371            " i i
1372            + 1 5
1373            + 2 6",
1374        );
1375
1376        // test delete wrong value, delete inexistent pk
1377        let chunk3 = StreamChunk::from_pretty(
1378            " i i
1379            + 1 6",
1380        );
1381
1382        // Prepare stream executors.
1383        let source = MockSource::with_messages(vec![
1384            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1385            Message::Chunk(chunk1),
1386            Message::Chunk(chunk2),
1387            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1388            Message::Chunk(chunk3),
1389            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1390        ])
1391        .into_executor(schema.clone(), PkIndices::new());
1392
1393        let order_types = vec![OrderType::ascending()];
1394        let column_descs = vec![
1395            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1396            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1397        ];
1398
1399        let table = BatchTable::for_test(
1400            memory_state_store.clone(),
1401            table_id,
1402            column_descs,
1403            order_types,
1404            vec![0],
1405            vec![0, 1],
1406        );
1407
1408        let mut materialize_executor = MaterializeExecutor::for_test(
1409            source,
1410            memory_state_store,
1411            table_id,
1412            vec![ColumnOrder::new(0, OrderType::ascending())],
1413            column_ids,
1414            Arc::new(AtomicU64::new(0)),
1415            ConflictBehavior::IgnoreConflict,
1416        )
1417        .await
1418        .boxed()
1419        .execute();
1420        materialize_executor.next().await.transpose().unwrap();
1421
1422        materialize_executor.next().await.transpose().unwrap();
1423        materialize_executor.next().await.transpose().unwrap();
1424
1425        // First stream chunk. We check the existence of (3) -> (3,6)
1426        match materialize_executor.next().await.transpose().unwrap() {
1427            Some(Message::Barrier(_)) => {
1428                let row = table
1429                    .get_row(
1430                        &OwnedRow::new(vec![Some(3_i32.into())]),
1431                        HummockReadEpoch::NoWait(u64::MAX),
1432                    )
1433                    .await
1434                    .unwrap();
1435                assert_eq!(
1436                    row,
1437                    Some(OwnedRow::new(vec![Some(3_i32.into()), Some(6_i32.into())]))
1438                );
1439
1440                let row = table
1441                    .get_row(
1442                        &OwnedRow::new(vec![Some(1_i32.into())]),
1443                        HummockReadEpoch::NoWait(u64::MAX),
1444                    )
1445                    .await
1446                    .unwrap();
1447                assert_eq!(
1448                    row,
1449                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(3_i32.into())]))
1450                );
1451
1452                let row = table
1453                    .get_row(
1454                        &OwnedRow::new(vec![Some(2_i32.into())]),
1455                        HummockReadEpoch::NoWait(u64::MAX),
1456                    )
1457                    .await
1458                    .unwrap();
1459                assert_eq!(
1460                    row,
1461                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(5_i32.into())]))
1462                );
1463            }
1464            _ => unreachable!(),
1465        }
1466    }
1467
1468    #[tokio::test]
1469    async fn test_ignore_delete_then_insert() {
1470        // Prepare storage and memtable.
1471        let memory_state_store = MemoryStateStore::new();
1472        let table_id = TableId::new(1);
1473        // Two columns of int32 type, the first column is PK.
1474        let schema = Schema::new(vec![
1475            Field::unnamed(DataType::Int32),
1476            Field::unnamed(DataType::Int32),
1477        ]);
1478        let column_ids = vec![0.into(), 1.into()];
1479
1480        // test insert after delete one pk, the latter insert should succeed.
1481        let chunk1 = StreamChunk::from_pretty(
1482            " i i
1483            + 1 3
1484            - 1 3
1485            + 1 6",
1486        );
1487
1488        // Prepare stream executors.
1489        let source = MockSource::with_messages(vec![
1490            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1491            Message::Chunk(chunk1),
1492            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1493        ])
1494        .into_executor(schema.clone(), PkIndices::new());
1495
1496        let order_types = vec![OrderType::ascending()];
1497        let column_descs = vec![
1498            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1499            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1500        ];
1501
1502        let table = BatchTable::for_test(
1503            memory_state_store.clone(),
1504            table_id,
1505            column_descs,
1506            order_types,
1507            vec![0],
1508            vec![0, 1],
1509        );
1510
1511        let mut materialize_executor = MaterializeExecutor::for_test(
1512            source,
1513            memory_state_store,
1514            table_id,
1515            vec![ColumnOrder::new(0, OrderType::ascending())],
1516            column_ids,
1517            Arc::new(AtomicU64::new(0)),
1518            ConflictBehavior::IgnoreConflict,
1519        )
1520        .await
1521        .boxed()
1522        .execute();
1523        let _msg1 = materialize_executor
1524            .next()
1525            .await
1526            .transpose()
1527            .unwrap()
1528            .unwrap()
1529            .as_barrier()
1530            .unwrap();
1531        let _msg2 = materialize_executor
1532            .next()
1533            .await
1534            .transpose()
1535            .unwrap()
1536            .unwrap()
1537            .as_chunk()
1538            .unwrap();
1539        let _msg3 = materialize_executor
1540            .next()
1541            .await
1542            .transpose()
1543            .unwrap()
1544            .unwrap()
1545            .as_barrier()
1546            .unwrap();
1547
1548        let row = table
1549            .get_row(
1550                &OwnedRow::new(vec![Some(1_i32.into())]),
1551                HummockReadEpoch::NoWait(u64::MAX),
1552            )
1553            .await
1554            .unwrap();
1555        assert_eq!(
1556            row,
1557            Some(OwnedRow::new(vec![Some(1_i32.into()), Some(6_i32.into())]))
1558        );
1559    }
1560
1561    #[tokio::test]
1562    async fn test_ignore_delete_and_update_conflict() {
1563        // Prepare storage and memtable.
1564        let memory_state_store = MemoryStateStore::new();
1565        let table_id = TableId::new(1);
1566        // Two columns of int32 type, the first column is PK.
1567        let schema = Schema::new(vec![
1568            Field::unnamed(DataType::Int32),
1569            Field::unnamed(DataType::Int32),
1570        ]);
1571        let column_ids = vec![0.into(), 1.into()];
1572
1573        // test double insert one pk, the latter should be ignored.
1574        let chunk1 = StreamChunk::from_pretty(
1575            " i i
1576            + 1 4
1577            + 2 5
1578            + 3 6
1579            U- 8 1
1580            U+ 8 2
1581            + 8 3",
1582        );
1583
1584        // test delete wrong value, delete inexistent pk
1585        let chunk2 = StreamChunk::from_pretty(
1586            " i i
1587            + 7 8
1588            - 3 4
1589            - 5 0",
1590        );
1591
1592        // test delete wrong value, delete inexistent pk
1593        let chunk3 = StreamChunk::from_pretty(
1594            " i i
1595            + 1 5
1596            U- 2 4
1597            U+ 2 8
1598            U- 9 0
1599            U+ 9 1",
1600        );
1601
1602        // Prepare stream executors.
1603        let source = MockSource::with_messages(vec![
1604            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1605            Message::Chunk(chunk1),
1606            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1607            Message::Chunk(chunk2),
1608            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1609            Message::Chunk(chunk3),
1610            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1611        ])
1612        .into_executor(schema.clone(), PkIndices::new());
1613
1614        let order_types = vec![OrderType::ascending()];
1615        let column_descs = vec![
1616            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1617            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1618        ];
1619
1620        let table = BatchTable::for_test(
1621            memory_state_store.clone(),
1622            table_id,
1623            column_descs,
1624            order_types,
1625            vec![0],
1626            vec![0, 1],
1627        );
1628
1629        let mut materialize_executor = MaterializeExecutor::for_test(
1630            source,
1631            memory_state_store,
1632            table_id,
1633            vec![ColumnOrder::new(0, OrderType::ascending())],
1634            column_ids,
1635            Arc::new(AtomicU64::new(0)),
1636            ConflictBehavior::IgnoreConflict,
1637        )
1638        .await
1639        .boxed()
1640        .execute();
1641        materialize_executor.next().await.transpose().unwrap();
1642
1643        materialize_executor.next().await.transpose().unwrap();
1644
1645        // First stream chunk. We check the existence of (3) -> (3,6)
1646        match materialize_executor.next().await.transpose().unwrap() {
1647            Some(Message::Barrier(_)) => {
1648                // can read (8, 2), check insert after update
1649                let row = table
1650                    .get_row(
1651                        &OwnedRow::new(vec![Some(8_i32.into())]),
1652                        HummockReadEpoch::NoWait(u64::MAX),
1653                    )
1654                    .await
1655                    .unwrap();
1656                assert_eq!(
1657                    row,
1658                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1659                );
1660            }
1661            _ => unreachable!(),
1662        }
1663        materialize_executor.next().await.transpose().unwrap();
1664
1665        match materialize_executor.next().await.transpose().unwrap() {
1666            Some(Message::Barrier(_)) => {
1667                let row = table
1668                    .get_row(
1669                        &OwnedRow::new(vec![Some(7_i32.into())]),
1670                        HummockReadEpoch::NoWait(u64::MAX),
1671                    )
1672                    .await
1673                    .unwrap();
1674                assert_eq!(
1675                    row,
1676                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1677                );
1678
1679                // check delete wrong value
1680                let row = table
1681                    .get_row(
1682                        &OwnedRow::new(vec![Some(3_i32.into())]),
1683                        HummockReadEpoch::NoWait(u64::MAX),
1684                    )
1685                    .await
1686                    .unwrap();
1687                assert_eq!(row, None);
1688
1689                // check delete wrong pk
1690                let row = table
1691                    .get_row(
1692                        &OwnedRow::new(vec![Some(5_i32.into())]),
1693                        HummockReadEpoch::NoWait(u64::MAX),
1694                    )
1695                    .await
1696                    .unwrap();
1697                assert_eq!(row, None);
1698            }
1699            _ => unreachable!(),
1700        }
1701
1702        materialize_executor.next().await.transpose().unwrap();
1703        // materialize_executor.next().await.transpose().unwrap();
1704        // Second stream chunk. We check the existence of (7) -> (7,8)
1705        match materialize_executor.next().await.transpose().unwrap() {
1706            Some(Message::Barrier(_)) => {
1707                let row = table
1708                    .get_row(
1709                        &OwnedRow::new(vec![Some(1_i32.into())]),
1710                        HummockReadEpoch::NoWait(u64::MAX),
1711                    )
1712                    .await
1713                    .unwrap();
1714                assert_eq!(
1715                    row,
1716                    Some(OwnedRow::new(vec![Some(1_i32.into()), Some(4_i32.into())]))
1717                );
1718
1719                // check update wrong value
1720                let row = table
1721                    .get_row(
1722                        &OwnedRow::new(vec![Some(2_i32.into())]),
1723                        HummockReadEpoch::NoWait(u64::MAX),
1724                    )
1725                    .await
1726                    .unwrap();
1727                assert_eq!(
1728                    row,
1729                    Some(OwnedRow::new(vec![Some(2_i32.into()), Some(8_i32.into())]))
1730                );
1731
1732                // check update wrong pk, should become insert
1733                let row = table
1734                    .get_row(
1735                        &OwnedRow::new(vec![Some(9_i32.into())]),
1736                        HummockReadEpoch::NoWait(u64::MAX),
1737                    )
1738                    .await
1739                    .unwrap();
1740                assert_eq!(
1741                    row,
1742                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1743                );
1744            }
1745            _ => unreachable!(),
1746        }
1747    }
1748
1749    #[tokio::test]
1750    async fn test_do_update_if_not_null_conflict() {
1751        // Prepare storage and memtable.
1752        let memory_state_store = MemoryStateStore::new();
1753        let table_id = TableId::new(1);
1754        // Two columns of int32 type, the first column is PK.
1755        let schema = Schema::new(vec![
1756            Field::unnamed(DataType::Int32),
1757            Field::unnamed(DataType::Int32),
1758        ]);
1759        let column_ids = vec![0.into(), 1.into()];
1760
1761        // should get (8, 2)
1762        let chunk1 = StreamChunk::from_pretty(
1763            " i i
1764            + 1 4
1765            + 2 .
1766            + 3 6
1767            U- 8 .
1768            U+ 8 2
1769            + 8 .",
1770        );
1771
1772        // should not get (3, x), should not get (5, 0)
1773        let chunk2 = StreamChunk::from_pretty(
1774            " i i
1775            + 7 8
1776            - 3 4
1777            - 5 0",
1778        );
1779
1780        // should get (2, None), (7, 8)
1781        let chunk3 = StreamChunk::from_pretty(
1782            " i i
1783            + 1 5
1784            + 7 .
1785            U- 2 4
1786            U+ 2 .
1787            U- 9 0
1788            U+ 9 1",
1789        );
1790
1791        // Prepare stream executors.
1792        let source = MockSource::with_messages(vec![
1793            Message::Barrier(Barrier::new_test_barrier(test_epoch(1))),
1794            Message::Chunk(chunk1),
1795            Message::Barrier(Barrier::new_test_barrier(test_epoch(2))),
1796            Message::Chunk(chunk2),
1797            Message::Barrier(Barrier::new_test_barrier(test_epoch(3))),
1798            Message::Chunk(chunk3),
1799            Message::Barrier(Barrier::new_test_barrier(test_epoch(4))),
1800        ])
1801        .into_executor(schema.clone(), PkIndices::new());
1802
1803        let order_types = vec![OrderType::ascending()];
1804        let column_descs = vec![
1805            ColumnDesc::unnamed(column_ids[0], DataType::Int32),
1806            ColumnDesc::unnamed(column_ids[1], DataType::Int32),
1807        ];
1808
1809        let table = BatchTable::for_test(
1810            memory_state_store.clone(),
1811            table_id,
1812            column_descs,
1813            order_types,
1814            vec![0],
1815            vec![0, 1],
1816        );
1817
1818        let mut materialize_executor = MaterializeExecutor::for_test(
1819            source,
1820            memory_state_store,
1821            table_id,
1822            vec![ColumnOrder::new(0, OrderType::ascending())],
1823            column_ids,
1824            Arc::new(AtomicU64::new(0)),
1825            ConflictBehavior::DoUpdateIfNotNull,
1826        )
1827        .await
1828        .boxed()
1829        .execute();
1830        materialize_executor.next().await.transpose().unwrap();
1831
1832        materialize_executor.next().await.transpose().unwrap();
1833
1834        // First stream chunk. We check the existence of (3) -> (3,6)
1835        match materialize_executor.next().await.transpose().unwrap() {
1836            Some(Message::Barrier(_)) => {
1837                let row = table
1838                    .get_row(
1839                        &OwnedRow::new(vec![Some(8_i32.into())]),
1840                        HummockReadEpoch::NoWait(u64::MAX),
1841                    )
1842                    .await
1843                    .unwrap();
1844                assert_eq!(
1845                    row,
1846                    Some(OwnedRow::new(vec![Some(8_i32.into()), Some(2_i32.into())]))
1847                );
1848
1849                let row = table
1850                    .get_row(
1851                        &OwnedRow::new(vec![Some(2_i32.into())]),
1852                        HummockReadEpoch::NoWait(u64::MAX),
1853                    )
1854                    .await
1855                    .unwrap();
1856                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1857            }
1858            _ => unreachable!(),
1859        }
1860        materialize_executor.next().await.transpose().unwrap();
1861
1862        match materialize_executor.next().await.transpose().unwrap() {
1863            Some(Message::Barrier(_)) => {
1864                let row = table
1865                    .get_row(
1866                        &OwnedRow::new(vec![Some(7_i32.into())]),
1867                        HummockReadEpoch::NoWait(u64::MAX),
1868                    )
1869                    .await
1870                    .unwrap();
1871                assert_eq!(
1872                    row,
1873                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1874                );
1875
1876                // check delete wrong value
1877                let row = table
1878                    .get_row(
1879                        &OwnedRow::new(vec![Some(3_i32.into())]),
1880                        HummockReadEpoch::NoWait(u64::MAX),
1881                    )
1882                    .await
1883                    .unwrap();
1884                assert_eq!(row, None);
1885
1886                // check delete wrong pk
1887                let row = table
1888                    .get_row(
1889                        &OwnedRow::new(vec![Some(5_i32.into())]),
1890                        HummockReadEpoch::NoWait(u64::MAX),
1891                    )
1892                    .await
1893                    .unwrap();
1894                assert_eq!(row, None);
1895            }
1896            _ => unreachable!(),
1897        }
1898
1899        materialize_executor.next().await.transpose().unwrap();
1900        // materialize_executor.next().await.transpose().unwrap();
1901        // Second stream chunk. We check the existence of (7) -> (7,8)
1902        match materialize_executor.next().await.transpose().unwrap() {
1903            Some(Message::Barrier(_)) => {
1904                let row = table
1905                    .get_row(
1906                        &OwnedRow::new(vec![Some(7_i32.into())]),
1907                        HummockReadEpoch::NoWait(u64::MAX),
1908                    )
1909                    .await
1910                    .unwrap();
1911                assert_eq!(
1912                    row,
1913                    Some(OwnedRow::new(vec![Some(7_i32.into()), Some(8_i32.into())]))
1914                );
1915
1916                // check update wrong value
1917                let row = table
1918                    .get_row(
1919                        &OwnedRow::new(vec![Some(2_i32.into())]),
1920                        HummockReadEpoch::NoWait(u64::MAX),
1921                    )
1922                    .await
1923                    .unwrap();
1924                assert_eq!(row, Some(OwnedRow::new(vec![Some(2_i32.into()), None])));
1925
1926                // check update wrong pk, should become insert
1927                let row = table
1928                    .get_row(
1929                        &OwnedRow::new(vec![Some(9_i32.into())]),
1930                        HummockReadEpoch::NoWait(u64::MAX),
1931                    )
1932                    .await
1933                    .unwrap();
1934                assert_eq!(
1935                    row,
1936                    Some(OwnedRow::new(vec![Some(9_i32.into()), Some(1_i32.into())]))
1937                );
1938            }
1939            _ => unreachable!(),
1940        }
1941    }
1942
1943    fn gen_fuzz_data(row_number: usize, chunk_size: usize) -> Vec<StreamChunk> {
1944        const KN: u32 = 4;
1945        const SEED: u64 = 998244353;
1946        let mut ret = vec![];
1947        let mut builder =
1948            StreamChunkBuilder::new(chunk_size, vec![DataType::Int32, DataType::Int32]);
1949        let mut rng = SmallRng::seed_from_u64(SEED);
1950
1951        let random_vis = |c: StreamChunk, rng: &mut SmallRng| -> StreamChunk {
1952            let len = c.data_chunk().capacity();
1953            let mut c = StreamChunkMut::from(c);
1954            for i in 0..len {
1955                c.set_vis(i, rng.random_bool(0.5));
1956            }
1957            c.into()
1958        };
1959        for _ in 0..row_number {
1960            let k = (rng.next_u32() % KN) as i32;
1961            let v = rng.next_u32() as i32;
1962            let op = if rng.random_bool(0.5) {
1963                Op::Insert
1964            } else {
1965                Op::Delete
1966            };
1967            if let Some(c) =
1968                builder.append_row(op, OwnedRow::new(vec![Some(k.into()), Some(v.into())]))
1969            {
1970                ret.push(random_vis(c, &mut rng));
1971            }
1972        }
1973        if let Some(c) = builder.take() {
1974            ret.push(random_vis(c, &mut rng));
1975        }
1976        ret
1977    }
1978
1979    async fn fuzz_test_stream_consistent_inner(conflict_behavior: ConflictBehavior) {
1980        const N: usize = 100000;
1981
1982        // Prepare storage and memtable.
1983        let memory_state_store = MemoryStateStore::new();
1984        let table_id = TableId::new(1);
1985        // Two columns of int32 type, the first column is PK.
1986        let schema = Schema::new(vec![
1987            Field::unnamed(DataType::Int32),
1988            Field::unnamed(DataType::Int32),
1989        ]);
1990        let column_ids = vec![0.into(), 1.into()];
1991
1992        let chunks = gen_fuzz_data(N, 128);
1993        let messages = iter::once(Message::Barrier(Barrier::new_test_barrier(test_epoch(1))))
1994            .chain(chunks.into_iter().map(Message::Chunk))
1995            .chain(iter::once(Message::Barrier(Barrier::new_test_barrier(
1996                test_epoch(2),
1997            ))))
1998            .collect();
1999        // Prepare stream executors.
2000        let source =
2001            MockSource::with_messages(messages).into_executor(schema.clone(), PkIndices::new());
2002
2003        let mut materialize_executor = MaterializeExecutor::for_test(
2004            source,
2005            memory_state_store.clone(),
2006            table_id,
2007            vec![ColumnOrder::new(0, OrderType::ascending())],
2008            column_ids,
2009            Arc::new(AtomicU64::new(0)),
2010            conflict_behavior,
2011        )
2012        .await
2013        .boxed()
2014        .execute();
2015        materialize_executor.expect_barrier().await;
2016
2017        let order_types = vec![OrderType::ascending()];
2018        let column_descs = vec![
2019            ColumnDesc::unnamed(0.into(), DataType::Int32),
2020            ColumnDesc::unnamed(1.into(), DataType::Int32),
2021        ];
2022        let pk_indices = vec![0];
2023
2024        let mut table = StateTable::from_table_catalog(
2025            &gen_pbtable(
2026                TableId::from(1002),
2027                column_descs.clone(),
2028                order_types,
2029                pk_indices,
2030                0,
2031            ),
2032            memory_state_store.clone(),
2033            None,
2034        )
2035        .await;
2036
2037        while let Message::Chunk(c) = materialize_executor.next().await.unwrap().unwrap() {
2038            // check with state table's memtable
2039            table.write_chunk(c);
2040        }
2041    }
2042
2043    #[tokio::test]
2044    async fn fuzz_test_stream_consistent_upsert() {
2045        fuzz_test_stream_consistent_inner(ConflictBehavior::Overwrite).await
2046    }
2047
2048    #[tokio::test]
2049    async fn fuzz_test_stream_consistent_ignore() {
2050        fuzz_test_stream_consistent_inner(ConflictBehavior::IgnoreConflict).await
2051    }
2052}