risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use itertools::Itertools;
28use risingwave_common::array::stream_record::Record;
29use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{
32    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
33};
34use risingwave_common::config::StreamingConfig;
35use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
36use risingwave_common::row::{self, OwnedRow, Row, RowExt};
37use risingwave_common::types::{DataType, ScalarImpl};
38use risingwave_common::util::column_index_mapping::ColIndexMapping;
39use risingwave_common::util::epoch::EpochPair;
40use risingwave_common::util::row_serde::OrderedRowSerde;
41use risingwave_common::util::sort_util::OrderType;
42use risingwave_common::util::value_encoding::BasicSerde;
43use risingwave_hummock_sdk::HummockReadEpoch;
44use risingwave_hummock_sdk::key::{
45    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
46    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
47};
48use risingwave_hummock_sdk::table_watermark::{
49    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
50};
51use risingwave_pb::catalog::Table;
52use risingwave_storage::StateStore;
53use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
54use risingwave_storage::hummock::CachePolicy;
55use risingwave_storage::mem_table::MemTableError;
56use risingwave_storage::row_serde::find_columns_by_ids;
57use risingwave_storage::row_serde::row_serde_util::{
58    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
59};
60use risingwave_storage::row_serde::value_serde::ValueRowSerde;
61use risingwave_storage::store::*;
62use risingwave_storage::table::{KeyedRow, TableDistribution};
63use thiserror_ext::AsReport;
64use tracing::{Instrument, trace};
65
66use crate::cache::cache_may_stale;
67use crate::executor::StreamExecutorResult;
68
69/// This macro is used to mark a point where we want to randomly discard the operation and early
70/// return, only in insane mode.
71macro_rules! insane_mode_discard_point {
72    () => {{
73        use rand::Rng;
74        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
75            return;
76        }
77    }};
78}
79
80/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
81/// row-based encoding.
82pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
83where
84    S: StateStore,
85    SD: ValueRowSerde,
86{
87    /// Id for this table.
88    table_id: TableId,
89
90    /// State store backend.
91    row_store: StateTableRowStore<S::Local, SD>,
92
93    /// State store for accessing snapshot data
94    store: S,
95
96    /// Current epoch
97    epoch: Option<EpochPair>,
98
99    /// Used for serializing and deserializing the primary key.
100    pk_serde: OrderedRowSerde,
101
102    /// Indices of primary key.
103    /// Note that the index is based on the all columns of the table, instead of the output ones.
104    // FIXME: revisit constructions and usages.
105    pk_indices: Vec<usize>,
106
107    /// Distribution of the state table.
108    ///
109    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
110    /// executor. The table will also check whether the written rows
111    /// conform to this partition.
112    distribution: TableDistribution,
113
114    prefix_hint_len: usize,
115
116    value_indices: Option<Vec<usize>>,
117
118    /// The index of the watermark column used for state cleaning in all columns.
119    pub clean_watermark_index: Option<usize>,
120    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
121    pending_watermark: Option<ScalarImpl>,
122    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
123    committed_watermark: Option<ScalarImpl>,
124    /// Serializer and serde type for the watermark column.
125    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
126
127    /// Data Types
128    /// We will need to use to build data chunks from state table rows.
129    data_types: Vec<DataType>,
130
131    /// "i" here refers to the base `state_table`'s actual schema.
132    /// "o" here refers to the replicated state table's output schema.
133    /// This mapping is used to reconstruct a row being written from replicated state table.
134    /// Such that the schema of this row will match the full schema of the base state table.
135    /// It is only applicable for replication.
136    i2o_mapping: ColIndexMapping,
137
138    /// Output indices
139    /// Used for:
140    /// 1. Computing `output_value_indices` to ser/de replicated rows.
141    /// 2. Computing output pk indices to used them for backfill state.
142    pub output_indices: Vec<usize>,
143
144    op_consistency_level: StateTableOpConsistencyLevel,
145
146    /// Flag to indicate whether the state table has called `commit`, but has not called
147    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
148    on_post_commit: bool,
149}
150
151/// `StateTable` will use `BasicSerde` as default
152pub type StateTable<S> = StateTableInner<S, BasicSerde>;
153/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
154/// Used for `ArrangementBackfill` executor.
155pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
156
157// initialize
158impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
159where
160    S: StateStore,
161    SD: ValueRowSerde,
162{
163    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
164    /// and otherwise, deadlock can be likely to happen.
165    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
166        self.row_store
167            .init(epoch, self.distribution.vnodes())
168            .await?;
169        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
170        Ok(())
171    }
172
173    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
174        self.store
175            .try_wait_epoch(
176                HummockReadEpoch::Committed(prev_epoch),
177                TryWaitEpochOptions {
178                    table_id: self.table_id,
179                },
180            )
181            .await
182    }
183
184    pub fn state_store(&self) -> &S {
185        &self.store
186    }
187}
188
189fn consistent_old_value_op(
190    row_serde: Arc<impl ValueRowSerde>,
191    is_log_store: bool,
192) -> OpConsistencyLevel {
193    OpConsistencyLevel::ConsistentOldValue {
194        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
195            if first == second {
196                return true;
197            }
198            let first = match row_serde.deserialize(first) {
199                Ok(rows) => rows,
200                Err(e) => {
201                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
202                    return false;
203                }
204            };
205            let second = match row_serde.deserialize(second) {
206                Ok(rows) => rows,
207                Err(e) => {
208                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
209                    return false;
210                }
211            };
212            if first != second {
213                error!(first = ?first, second = ?second, "sanity check fail");
214                false
215            } else {
216                true
217            }
218        }),
219        is_log_store,
220    }
221}
222
223macro_rules! dispatch_value_indices {
224    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
225        if let Some(value_indices) = $value_indices {
226            $(
227                let $row_var_name = $row_var_name.project(value_indices);
228            )+
229            $body
230        } else {
231            $body
232        }
233    };
234}
235
236/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
237/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
238/// provides method to read (`get` and `iter`) over serialized key (or key range)
239/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
240struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
241    state_store: LS,
242    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
243
244    table_id: TableId,
245    table_option: TableOption,
246    row_serde: Arc<SD>,
247    // should be only used for debugging in panic message of handle_mem_table_error
248    pk_serde: OrderedRowSerde,
249}
250
251impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
252    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
253        if let Some(rows) = &mut self.all_rows {
254            rows.clear();
255            let start_time = Instant::now();
256            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
257                let state_store = &self.state_store;
258                let retention_seconds = self.table_option.retention_seconds;
259                let row_serde = &self.row_serde;
260                async move {
261                    let mut rows = BTreeMap::new();
262                    let memcomparable_range_with_vnode =
263                        prefixed_range_with_vnode::<Bytes>(.., vnode);
264                    // TODO: set read options
265                    let stream = deserialize_keyed_row_stream::<Bytes>(
266                        state_store
267                            .iter(
268                                memcomparable_range_with_vnode,
269                                ReadOptions {
270                                    prefix_hint: None,
271                                    prefetch_options: Default::default(),
272                                    cache_policy: Default::default(),
273                                    retention_seconds,
274                                },
275                            )
276                            .await?,
277                        &**row_serde,
278                    );
279                    pin_mut!(stream);
280                    while let Some((encoded_key, row)) = stream.try_next().await? {
281                        let key = TableKey(encoded_key);
282                        let (iter_vnode, key) = key.split_vnode_bytes();
283                        assert_eq!(vnode, iter_vnode);
284                        rows.try_insert(key, row).expect("non-duplicated");
285                    }
286                    Ok((vnode, rows)) as StreamExecutorResult<_>
287                }
288            }))
289            .await?
290            .into_iter()
291            .collect();
292            // avoid flooding e2e-test log
293            if !cfg!(debug_assertions) {
294                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
295            }
296        }
297        Ok(())
298    }
299
300    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
301        self.state_store.init(InitOptions::new(epoch)).await?;
302        self.may_reload_all_rows(vnode_bitmap).await
303    }
304
305    async fn update_vnode_bitmap(
306        &mut self,
307        vnodes: Arc<Bitmap>,
308    ) -> StreamExecutorResult<Arc<Bitmap>> {
309        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
310        self.may_reload_all_rows(&vnodes).await?;
311        Ok(prev_vnodes)
312    }
313
314    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
315        self.state_store.try_flush().await?;
316        Ok(())
317    }
318
319    async fn seal_current_epoch(
320        &mut self,
321        next_epoch: u64,
322        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
323        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
324    ) -> StreamExecutorResult<()> {
325        if let Some((direction, watermarks, serde_type)) = &table_watermarks
326            && let Some(rows) = &mut self.all_rows
327        {
328            match serde_type {
329                WatermarkSerdeType::PkPrefix => {
330                    for vnode_watermark in watermarks {
331                        match direction {
332                            WatermarkDirection::Ascending => {
333                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
334                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
335                                    // split_off returns everything after the given key, including the key.
336                                    *rows = rows.split_off(vnode_watermark.watermark());
337                                }
338                            }
339                            WatermarkDirection::Descending => {
340                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
341                                let split_off_key = next_key(vnode_watermark.watermark());
342                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
343                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
344                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
345                                    // (..Include(vnode_watermark.watermark()))
346                                    rows.split_off(split_off_key.as_slice());
347                                }
348                            }
349                        }
350                    }
351                }
352                WatermarkSerdeType::NonPkPrefix => {
353                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
354                    self.all_rows = None;
355                }
356            }
357        }
358        self.state_store
359            .flush()
360            .instrument(tracing::info_span!("state_table_flush"))
361            .await?;
362        let switch_op_consistency_level =
363            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
364                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
365                StateTableOpConsistencyLevel::ConsistentOldValue => {
366                    consistent_old_value_op(self.row_serde.clone(), false)
367                }
368                StateTableOpConsistencyLevel::LogStoreEnabled => {
369                    consistent_old_value_op(self.row_serde.clone(), true)
370                }
371            });
372        self.state_store.seal_current_epoch(
373            next_epoch,
374            SealCurrentEpochOptions {
375                table_watermarks,
376                switch_op_consistency_level,
377            },
378        );
379        Ok(())
380    }
381}
382
383#[derive(Eq, PartialEq, Copy, Clone, Debug)]
384pub enum StateTableOpConsistencyLevel {
385    /// Op is inconsistent
386    Inconsistent,
387    /// Op is consistent.
388    /// - Insert op should ensure that the key does not exist previously
389    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
390    ConsistentOldValue,
391    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
392    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
393    LogStoreEnabled,
394}
395
396pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
397    table_catalog: &'a Table,
398    store: S,
399    vnodes: Option<Arc<Bitmap>>,
400    op_consistency_level: Option<StateTableOpConsistencyLevel>,
401    output_column_ids: Option<Vec<ColumnId>>,
402    preload_all_rows: PreloadAllRow,
403
404    _serde: PhantomData<SD>,
405}
406
407impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
408    StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
409{
410    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
411        Self {
412            table_catalog,
413            store,
414            vnodes,
415            op_consistency_level: None,
416            output_column_ids: None,
417            preload_all_rows: (),
418            _serde: Default::default(),
419        }
420    }
421
422    fn with_preload_all_rows(
423        self,
424        preload_all_rows: bool,
425    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
426        StateTableBuilder {
427            table_catalog: self.table_catalog,
428            store: self.store,
429            vnodes: self.vnodes,
430            op_consistency_level: self.op_consistency_level,
431            output_column_ids: self.output_column_ids,
432            preload_all_rows,
433            _serde: Default::default(),
434        }
435    }
436
437    pub fn enable_preload_all_rows_by_config(
438        self,
439        config: &StreamingConfig,
440    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
441        let developer = &config.developer;
442        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
443            !developer
444                .mem_preload_state_table_ids_blacklist
445                .contains(&self.table_catalog.id.as_raw_id())
446        } else {
447            developer
448                .mem_preload_state_table_ids_whitelist
449                .contains(&self.table_catalog.id.as_raw_id())
450        };
451        self.with_preload_all_rows(preload_all_rows)
452    }
453
454    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
455        self.with_preload_all_rows(false)
456    }
457}
458
459impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
460    StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
461{
462    pub fn with_op_consistency_level(
463        mut self,
464        op_consistency_level: StateTableOpConsistencyLevel,
465    ) -> Self {
466        self.op_consistency_level = Some(op_consistency_level);
467        self
468    }
469
470    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
471        self.output_column_ids = Some(output_column_ids);
472        self
473    }
474}
475
476impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
477    StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
478{
479    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
480        let mut preload_all_rows = self.preload_all_rows;
481        if preload_all_rows
482            && let Err(e) =
483                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
484        {
485            warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
486            preload_all_rows = false;
487        }
488        StateTableInner::from_table_catalog_inner(
489            self.table_catalog,
490            self.store,
491            self.vnodes,
492            self.op_consistency_level
493                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
494            self.output_column_ids.unwrap_or_default(),
495            preload_all_rows,
496        )
497        .await
498    }
499}
500
501// initialize
502// FIXME(kwannoel): Enforce that none of the constructors here
503// should be used by replicated state table.
504// Apart from from_table_catalog_inner.
505impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
506where
507    S: StateStore,
508    SD: ValueRowSerde,
509{
510    /// Create state table from table catalog and store.
511    ///
512    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
513    #[cfg(any(test, feature = "test"))]
514    pub async fn from_table_catalog(
515        table_catalog: &Table,
516        store: S,
517        vnodes: Option<Arc<Bitmap>>,
518    ) -> Self {
519        StateTableBuilder::new(table_catalog, store, vnodes)
520            .forbid_preload_all_rows()
521            .build()
522            .await
523    }
524
525    /// Create state table from table catalog and store with sanity check disabled.
526    pub async fn from_table_catalog_inconsistent_op(
527        table_catalog: &Table,
528        store: S,
529        vnodes: Option<Arc<Bitmap>>,
530    ) -> Self {
531        StateTableBuilder::new(table_catalog, store, vnodes)
532            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
533            .forbid_preload_all_rows()
534            .build()
535            .await
536    }
537
538    /// Create state table from table catalog and store.
539    async fn from_table_catalog_inner(
540        table_catalog: &Table,
541        store: S,
542        vnodes: Option<Arc<Bitmap>>,
543        op_consistency_level: StateTableOpConsistencyLevel,
544        output_column_ids: Vec<ColumnId>,
545        preload_all_rows: bool,
546    ) -> Self {
547        let table_id = table_catalog.id;
548        let table_columns: Vec<ColumnDesc> = table_catalog
549            .columns
550            .iter()
551            .map(|col| col.column_desc.as_ref().unwrap().into())
552            .collect();
553        let data_types: Vec<DataType> = table_catalog
554            .columns
555            .iter()
556            .map(|col| {
557                col.get_column_desc()
558                    .unwrap()
559                    .get_column_type()
560                    .unwrap()
561                    .into()
562            })
563            .collect();
564        let order_types: Vec<OrderType> = table_catalog
565            .pk
566            .iter()
567            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
568            .collect();
569        let dist_key_indices: Vec<usize> = table_catalog
570            .distribution_key
571            .iter()
572            .map(|dist_index| *dist_index as usize)
573            .collect();
574
575        let pk_indices = table_catalog
576            .pk
577            .iter()
578            .map(|col_order| col_order.column_index as usize)
579            .collect_vec();
580
581        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
582        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
583            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
584        } else {
585            table_catalog
586                .get_dist_key_in_pk()
587                .iter()
588                .map(|idx| *idx as usize)
589                .collect()
590        };
591
592        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
593            let vnode_col_idx = *idx as usize;
594            pk_indices.iter().position(|&i| vnode_col_idx == i)
595        });
596
597        let distribution =
598            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
599        assert_eq!(
600            distribution.vnode_count(),
601            table_catalog.vnode_count(),
602            "vnode count mismatch, scanning table {} under wrong distribution?",
603            table_catalog.name,
604        );
605
606        let pk_data_types = pk_indices
607            .iter()
608            .map(|i| table_columns[*i].data_type.clone())
609            .collect();
610        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
611
612        let input_value_indices = table_catalog
613            .value_indices
614            .iter()
615            .map(|val| *val as usize)
616            .collect_vec();
617
618        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
619
620        // if value_indices is the no shuffle full columns.
621        let value_indices = match input_value_indices.len() == table_columns.len()
622            && input_value_indices == no_shuffle_value_indices
623        {
624            true => None,
625            false => Some(input_value_indices),
626        };
627        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
628
629        let row_serde = Arc::new(SD::new(
630            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
631            Arc::from(table_columns.clone().into_boxed_slice()),
632        ));
633
634        let state_table_op_consistency_level = op_consistency_level;
635        let op_consistency_level = match op_consistency_level {
636            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
637            StateTableOpConsistencyLevel::ConsistentOldValue => {
638                consistent_old_value_op(row_serde.clone(), false)
639            }
640            StateTableOpConsistencyLevel::LogStoreEnabled => {
641                consistent_old_value_op(row_serde.clone(), true)
642            }
643        };
644
645        let table_option = TableOption::new(table_catalog.retention_seconds);
646        let new_local_options = if IS_REPLICATED {
647            NewLocalOptions::new_replicated(
648                table_id,
649                op_consistency_level,
650                table_option,
651                distribution.vnodes().clone(),
652            )
653        } else {
654            NewLocalOptions::new(
655                table_id,
656                op_consistency_level,
657                table_option,
658                distribution.vnodes().clone(),
659                true,
660            )
661        };
662        let local_state_store = store.new_local(new_local_options).await;
663
664        // If state table has versioning, that means it supports
665        // Schema change. In that case, the row encoding should be column aware as well.
666        // Otherwise both will be false.
667        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
668        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
669        assert_eq!(
670            table_catalog.version.is_some(),
671            row_serde.kind().is_column_aware()
672        );
673
674        // Get info for replicated state table.
675        let output_column_ids_to_input_idx = output_column_ids
676            .iter()
677            .enumerate()
678            .map(|(pos, id)| (*id, pos))
679            .collect::<HashMap<_, _>>();
680
681        // Compute column descriptions
682        let columns: Vec<ColumnDesc> = table_catalog
683            .columns
684            .iter()
685            .map(|c| c.column_desc.as_ref().unwrap().into())
686            .collect_vec();
687
688        // Compute i2o mapping
689        // Note that this can be a partial mapping, since we use the i2o mapping to get
690        // any 1 of the output columns, and use that to fill the input column.
691        let mut i2o_mapping = vec![None; columns.len()];
692        for (i, column) in columns.iter().enumerate() {
693            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
694                i2o_mapping[i] = Some(*pos);
695            }
696        }
697        // We can prune any duplicate column indices
698        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
699
700        // Compute output indices
701        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
702
703        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
704        if clean_watermark_indices.len() > 1 {
705            unimplemented!("multiple clean watermark columns are not supported yet")
706        }
707        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
708
709        let watermark_serde = clean_watermark_index.map(|idx| {
710            let pk_idx = pk_indices.iter().position(|&i| i == idx);
711            let (watermark_serde, watermark_serde_type) = match pk_idx {
712                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
713                Some(pk_idx) => (
714                    pk_serde.index(pk_idx).into_owned(),
715                    WatermarkSerdeType::NonPkPrefix,
716                ),
717                None => (
718                    OrderedRowSerde::new(
719                        vec![data_types[idx].clone()],
720                        vec![OrderType::ascending()],
721                    ),
722                    // TODO(ttl): may introduce a new type for watermark not in pk.
723                    WatermarkSerdeType::NonPkPrefix,
724                ),
725            };
726            (watermark_serde, watermark_serde_type)
727        });
728
729        // Restore persisted table watermark.
730        //
731        // Note: currently the underlying local state store only exposes persisted watermarks for
732        // `PkPrefix` type (i.e., the first PK column), so we only restore in that case.
733        let max_watermark_of_vnodes = distribution
734            .vnodes()
735            .iter_vnodes()
736            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
737            .max();
738        let committed_watermark = if let Some((deser, WatermarkSerdeType::PkPrefix)) =
739            watermark_serde.as_ref()
740            && let Some(max_watermark) = max_watermark_of_vnodes
741        {
742            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
743                assert!(row.len() == 1);
744                row[0].clone()
745            });
746            if deserialized.is_none() {
747                tracing::error!(
748                    vnodes = ?distribution.vnodes(),
749                    watermark = ?max_watermark,
750                    "Failed to deserialize persisted watermark from state store.",
751                );
752            }
753            deserialized
754        } else {
755            None
756        };
757
758        Self {
759            table_id,
760            row_store: StateTableRowStore {
761                all_rows: preload_all_rows.then(HashMap::new),
762                table_option,
763                state_store: local_state_store,
764                row_serde,
765                pk_serde: pk_serde.clone(),
766                table_id,
767            },
768            store,
769            epoch: None,
770            pk_serde,
771            pk_indices,
772            distribution,
773            prefix_hint_len,
774            value_indices,
775            pending_watermark: None,
776            committed_watermark,
777            watermark_serde,
778            data_types,
779            output_indices,
780            i2o_mapping,
781            op_consistency_level: state_table_op_consistency_level,
782            clean_watermark_index,
783            on_post_commit: false,
784        }
785    }
786
787    pub fn get_data_types(&self) -> &[DataType] {
788        &self.data_types
789    }
790
791    pub fn table_id(&self) -> TableId {
792        self.table_id
793    }
794
795    /// Get the vnode value with given (prefix of) primary key
796    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
797        self.distribution
798            .try_compute_vnode_by_pk_prefix(pk_prefix)
799            .expect("For streaming, the given prefix must be enough to calculate the vnode")
800    }
801
802    /// Get the vnode value of the given primary key
803    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
804        self.distribution.compute_vnode_by_pk(pk)
805    }
806
807    /// NOTE(kwannoel): This is used by backfill.
808    /// We want to check pk indices of upstream table.
809    pub fn pk_indices(&self) -> &[usize] {
810        &self.pk_indices
811    }
812
813    /// Get the indices of the primary key columns in the output columns.
814    ///
815    /// Returns `None` if any of the primary key columns is not in the output columns.
816    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
817        assert!(IS_REPLICATED);
818        self.pk_indices
819            .iter()
820            .map(|&i| self.output_indices.iter().position(|&j| i == j))
821            .collect()
822    }
823
824    pub fn pk_serde(&self) -> &OrderedRowSerde {
825        &self.pk_serde
826    }
827
828    pub fn vnodes(&self) -> &Arc<Bitmap> {
829        self.distribution.vnodes()
830    }
831
832    pub fn value_indices(&self) -> &Option<Vec<usize>> {
833        &self.value_indices
834    }
835
836    pub fn is_consistent_op(&self) -> bool {
837        matches!(
838            self.op_consistency_level,
839            StateTableOpConsistencyLevel::ConsistentOldValue
840                | StateTableOpConsistencyLevel::LogStoreEnabled
841        )
842    }
843}
844
845impl<S, SD> StateTableInner<S, SD, true>
846where
847    S: StateStore,
848    SD: ValueRowSerde,
849{
850    /// Create replicated state table from table catalog with output indices
851    pub async fn new_replicated(
852        table_catalog: &Table,
853        store: S,
854        vnodes: Option<Arc<Bitmap>>,
855        output_column_ids: Vec<ColumnId>,
856    ) -> Self {
857        // TODO: can it be ConsistentOldValue?
858        // TODO: may enable preload_all_rows
859        StateTableBuilder::new(table_catalog, store, vnodes)
860            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
861            .with_output_column_ids(output_column_ids)
862            .forbid_preload_all_rows()
863            .build()
864            .await
865    }
866}
867
868// point get
869impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
870where
871    S: StateStore,
872    SD: ValueRowSerde,
873{
874    /// Get a single row from state table.
875    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
876        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
877        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
878        match row {
879            Some(row) => {
880                if IS_REPLICATED {
881                    // If the table is replicated, we need to deserialize the row with the output
882                    // indices.
883                    let row = row.project(&self.output_indices);
884                    Ok(Some(row.into_owned_row()))
885                } else {
886                    Ok(Some(row))
887                }
888            }
889            None => Ok(None),
890        }
891    }
892
893    /// Get a raw encoded row from state table.
894    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
895        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
896        self.row_store.exists(serialized_pk, prefix_hint).await
897    }
898
899    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
900        assert!(pk.len() <= self.pk_indices.len());
901        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
902    }
903
904    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
905        let serialized_pk = self.serialize_pk(&pk);
906        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
907            Some(serialized_pk.slice(VirtualNode::SIZE..))
908        } else {
909            #[cfg(debug_assertions)]
910            if self.prefix_hint_len != 0 {
911                warn!(
912                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
913                );
914            }
915            None
916        };
917        (serialized_pk, prefix_hint)
918    }
919}
920
921impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
922    async fn get(
923        &self,
924        key_bytes: TableKey<Bytes>,
925        prefix_hint: Option<Bytes>,
926    ) -> StreamExecutorResult<Option<OwnedRow>> {
927        if let Some(rows) = &self.all_rows {
928            let (vnode, key) = key_bytes.split_vnode();
929            return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
930        }
931        let read_options = ReadOptions {
932            prefix_hint,
933            retention_seconds: self.table_option.retention_seconds,
934            cache_policy: CachePolicy::Fill(Hint::Normal),
935            ..Default::default()
936        };
937
938        self.state_store
939            .on_key_value(key_bytes, read_options, move |_, value| {
940                let row = self.row_serde.deserialize(value)?;
941                Ok(OwnedRow::new(row))
942            })
943            .await
944            .map_err(Into::into)
945    }
946
947    async fn exists(
948        &self,
949        key_bytes: TableKey<Bytes>,
950        prefix_hint: Option<Bytes>,
951    ) -> StreamExecutorResult<bool> {
952        if let Some(rows) = &self.all_rows {
953            let (vnode, key) = key_bytes.split_vnode();
954            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
955        }
956        let read_options = ReadOptions {
957            prefix_hint,
958            retention_seconds: self.table_option.retention_seconds,
959            cache_policy: CachePolicy::Fill(Hint::Normal),
960            ..Default::default()
961        };
962        let result = self
963            .state_store
964            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
965            .await?;
966        Ok(result.is_some())
967    }
968}
969
970/// A callback struct returned from [`StateTableInner::commit`].
971///
972/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
973/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
974/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
975///
976/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
977/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
978/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
979/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
980/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
981/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
982/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
983/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
984#[must_use]
985pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
986where
987    S: StateStore,
988    SD: ValueRowSerde,
989{
990    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
991}
992
993impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
994where
995    S: StateStore,
996    SD: ValueRowSerde,
997{
998    pub async fn post_yield_barrier(
999        mut self,
1000        new_vnodes: Option<Arc<Bitmap>>,
1001    ) -> StreamExecutorResult<
1002        Option<(
1003            (
1004                Arc<Bitmap>,
1005                Arc<Bitmap>,
1006                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1007            ),
1008            bool,
1009        )>,
1010    > {
1011        self.inner.on_post_commit = false;
1012        Ok(if let Some(new_vnodes) = new_vnodes {
1013            let (old_vnodes, cache_may_stale) =
1014                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1015            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1016        } else {
1017            None
1018        })
1019    }
1020
1021    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1022        &*self.inner
1023    }
1024
1025    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1026    async fn update_vnode_bitmap(
1027        &mut self,
1028        new_vnodes: Arc<Bitmap>,
1029    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1030        let prev_vnodes = self
1031            .inner
1032            .row_store
1033            .update_vnode_bitmap(new_vnodes.clone())
1034            .await?;
1035        assert_eq!(
1036            &prev_vnodes,
1037            self.inner.vnodes(),
1038            "state table and state store vnode bitmap mismatches"
1039        );
1040
1041        if self.inner.distribution.is_singleton() {
1042            assert_eq!(
1043                &new_vnodes,
1044                self.inner.vnodes(),
1045                "should not update vnode bitmap for singleton table"
1046            );
1047        }
1048        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1049
1050        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1051
1052        if cache_may_stale {
1053            self.inner.pending_watermark = None;
1054        }
1055
1056        Ok((
1057            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1058            cache_may_stale,
1059        ))
1060    }
1061}
1062
1063// write
1064impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1065    fn handle_mem_table_error(&self, e: StorageError) {
1066        let e = match e.into_inner() {
1067            ErrorKind::MemTable(e) => e,
1068            _ => unreachable!("should only get memtable error"),
1069        };
1070        match *e {
1071            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1072                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1073                panic!(
1074                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1075                    self.table_id,
1076                    vnode,
1077                    &key,
1078                    prev.debug_fmt(&*self.row_serde),
1079                    new.debug_fmt(&*self.row_serde),
1080                )
1081            }
1082        }
1083    }
1084
1085    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1086        insane_mode_discard_point!();
1087        let value_bytes = self.row_serde.serialize(&value).into();
1088        if let Some(rows) = &mut self.all_rows {
1089            let (vnode, key) = key.split_vnode_bytes();
1090            rows.get_mut(&vnode)
1091                .expect("covered vnode")
1092                .insert(key, value.into_owned_row());
1093        }
1094        self.state_store
1095            .insert(key, value_bytes, None)
1096            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1097    }
1098
1099    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1100        insane_mode_discard_point!();
1101        let value_bytes = self.row_serde.serialize(value).into();
1102        if let Some(rows) = &mut self.all_rows {
1103            let (vnode, key) = key.split_vnode();
1104            rows.get_mut(&vnode).expect("covered vnode").remove(key);
1105        }
1106        self.state_store
1107            .delete(key, value_bytes)
1108            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1109    }
1110
1111    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1112        insane_mode_discard_point!();
1113        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1114        let old_value_bytes = self.row_serde.serialize(old_value).into();
1115        if let Some(rows) = &mut self.all_rows {
1116            let (vnode, key) = key_bytes.split_vnode_bytes();
1117            rows.get_mut(&vnode)
1118                .expect("covered vnode")
1119                .insert(key, new_value.into_owned_row());
1120        }
1121        self.state_store
1122            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1123            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1124    }
1125}
1126
1127impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1128where
1129    S: StateStore,
1130    SD: ValueRowSerde,
1131{
1132    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1133    /// the table.
1134    pub fn insert(&mut self, value: impl Row) {
1135        let pk_indices = &self.pk_indices;
1136        let pk = (&value).project(pk_indices);
1137
1138        let key_bytes = self.serialize_pk(&pk);
1139        dispatch_value_indices!(&self.value_indices, [value], {
1140            self.row_store.insert(key_bytes, value)
1141        })
1142    }
1143
1144    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1145    /// column desc of the table.
1146    pub fn delete(&mut self, old_value: impl Row) {
1147        let pk_indices = &self.pk_indices;
1148        let pk = (&old_value).project(pk_indices);
1149
1150        let key_bytes = self.serialize_pk(&pk);
1151        dispatch_value_indices!(&self.value_indices, [old_value], {
1152            self.row_store.delete(key_bytes, old_value)
1153        })
1154    }
1155
1156    /// Update a row. The old and new value should have the same pk.
1157    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1158        let old_pk = (&old_value).project(self.pk_indices());
1159        let new_pk = (&new_value).project(self.pk_indices());
1160        debug_assert!(
1161            Row::eq(&old_pk, new_pk),
1162            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1163            self.table_id
1164        );
1165
1166        let key_bytes = self.serialize_pk(&new_pk);
1167        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1168            self.row_store.update(key_bytes, old_value, new_value)
1169        })
1170    }
1171
1172    /// Write a record into state table. Must have the same schema with the table.
1173    pub fn write_record(&mut self, record: Record<impl Row>) {
1174        match record {
1175            Record::Insert { new_row } => self.insert(new_row),
1176            Record::Delete { old_row } => self.delete(old_row),
1177            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1178        }
1179    }
1180
1181    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1182        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1183    }
1184
1185    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1186    // allow(izip, which use zip instead of zip_eq)
1187    #[allow(clippy::disallowed_methods)]
1188    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1189        let chunk = if IS_REPLICATED {
1190            self.fill_non_output_indices(chunk)
1191        } else {
1192            chunk
1193        };
1194
1195        let vnodes = self
1196            .distribution
1197            .compute_chunk_vnode(&chunk, &self.pk_indices);
1198
1199        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1200            let Some((op, row)) = optional_row else {
1201                continue;
1202            };
1203            let pk = row.project(&self.pk_indices);
1204            let vnode = vnodes[idx];
1205            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1206            match op {
1207                Op::Insert | Op::UpdateInsert => {
1208                    dispatch_value_indices!(&self.value_indices, [row], {
1209                        self.row_store.insert(key_bytes, row);
1210                    });
1211                }
1212                Op::Delete | Op::UpdateDelete => {
1213                    dispatch_value_indices!(&self.value_indices, [row], {
1214                        self.row_store.delete(key_bytes, row);
1215                    });
1216                }
1217            }
1218        }
1219    }
1220
1221    /// Update watermark for state cleaning.
1222    ///
1223    /// # Arguments
1224    ///
1225    /// * `watermark` - Latest watermark received.
1226    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1227        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1228        self.pending_watermark = Some(watermark);
1229    }
1230
1231    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1232    /// table through `update_watermark` method.
1233    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1234        self.committed_watermark.as_ref()
1235    }
1236
1237    /// Whether the state table is cleaned by watermark.
1238    pub fn cleaned_by_watermark(&self) -> bool {
1239        self.clean_watermark_index.is_some()
1240    }
1241
1242    pub async fn commit(
1243        &mut self,
1244        new_epoch: EpochPair,
1245    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1246        self.commit_inner(new_epoch, None).await
1247    }
1248
1249    #[cfg(test)]
1250    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1251        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1252    }
1253
1254    pub async fn commit_assert_no_update_vnode_bitmap(
1255        &mut self,
1256        new_epoch: EpochPair,
1257    ) -> StreamExecutorResult<()> {
1258        let post_commit = self.commit_inner(new_epoch, None).await?;
1259        post_commit.post_yield_barrier(None).await?;
1260        Ok(())
1261    }
1262
1263    pub async fn commit_may_switch_consistent_op(
1264        &mut self,
1265        new_epoch: EpochPair,
1266        op_consistency_level: StateTableOpConsistencyLevel,
1267    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1268        if self.op_consistency_level != op_consistency_level {
1269            // avoid flooding e2e-test log
1270            if !cfg!(debug_assertions) {
1271                info!(
1272                    ?new_epoch,
1273                    prev_op_consistency_level = ?self.op_consistency_level,
1274                    ?op_consistency_level,
1275                    table_id = %self.table_id,
1276                    "switch to new op consistency level"
1277                );
1278            }
1279            self.commit_inner(new_epoch, Some(op_consistency_level))
1280                .await
1281        } else {
1282            self.commit_inner(new_epoch, None).await
1283        }
1284    }
1285
1286    async fn commit_inner(
1287        &mut self,
1288        new_epoch: EpochPair,
1289        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1290    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1291        assert!(!self.on_post_commit);
1292        assert_eq!(
1293            self.epoch.expect("should only be called after init").curr,
1294            new_epoch.prev
1295        );
1296        if let Some(new_consistency_level) = switch_consistent_op {
1297            assert_ne!(self.op_consistency_level, new_consistency_level);
1298            self.op_consistency_level = new_consistency_level;
1299        }
1300        trace!(
1301            table_id = %self.table_id,
1302            epoch = ?self.epoch,
1303            "commit state table"
1304        );
1305
1306        let table_watermarks = self.commit_pending_watermark();
1307        self.row_store
1308            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1309            .await?;
1310        self.epoch = Some(new_epoch);
1311
1312        self.on_post_commit = true;
1313        Ok(StateTablePostCommit { inner: self })
1314    }
1315
1316    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1317    fn commit_pending_watermark(
1318        &mut self,
1319    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1320        let watermark = self.pending_watermark.take()?;
1321        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1322
1323        assert!(
1324            !self.pk_indices().is_empty(),
1325            "see pending watermark on empty pk"
1326        );
1327        let (watermark_serializer, watermark_type) = self
1328            .watermark_serde
1329            .as_ref()
1330            .expect("watermark serde should be initialized to commit watermark");
1331
1332        let watermark_suffix =
1333            serialize_pk(row::once(Some(watermark.clone())), watermark_serializer);
1334        let vnode_watermark = VnodeWatermark::new(
1335            self.vnodes().clone(),
1336            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1337        );
1338
1339        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1340
1341        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1342        let direction = if order_type.is_ascending() {
1343            WatermarkDirection::Ascending
1344        } else {
1345            WatermarkDirection::Descending
1346        };
1347
1348        self.committed_watermark = Some(watermark);
1349        Some((direction, vec![vnode_watermark], *watermark_type))
1350    }
1351
1352    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1353        self.row_store.try_flush().await?;
1354        Ok(())
1355    }
1356}
1357
1358// Manually expand trait alias for better IDE experience.
1359pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1360impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1361
1362pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1363impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1364
1365pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1366impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1367
1368pub trait FromVnodeBytes {
1369    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1370}
1371
1372impl FromVnodeBytes for Bytes {
1373    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1374        prefix_slice_with_vnode(vnode, bytes)
1375    }
1376}
1377
1378impl FromVnodeBytes for () {
1379    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1380}
1381
1382// Iterator functions
1383impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1384where
1385    S: StateStore,
1386    SD: ValueRowSerde,
1387{
1388    /// This function scans rows from the relational table with specific `pk_range` under the same
1389    /// `vnode`.
1390    pub async fn iter_with_vnode(
1391        &self,
1392
1393        // Optional vnode that returns an iterator only over the given range under that vnode.
1394        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1395        // iterate over each vnode that the `StateTableInner` owns.
1396        vnode: VirtualNode,
1397        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1398        prefetch_options: PrefetchOptions,
1399    ) -> StreamExecutorResult<impl RowStream<'_>> {
1400        Ok(self
1401            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1402            .await?
1403            .map_ok(|(_, row)| row))
1404    }
1405
1406    pub async fn iter_keyed_row_with_vnode(
1407        &self,
1408        vnode: VirtualNode,
1409        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1410        prefetch_options: PrefetchOptions,
1411    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1412        Ok(self
1413            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1414            .await?
1415            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1416    }
1417
1418    pub async fn iter_with_vnode_and_output_indices(
1419        &self,
1420        vnode: VirtualNode,
1421        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1422        prefetch_options: PrefetchOptions,
1423    ) -> StreamExecutorResult<impl RowStream<'_>> {
1424        assert!(IS_REPLICATED);
1425        let stream = self
1426            .iter_with_vnode(vnode, pk_range, prefetch_options)
1427            .await?;
1428        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1429    }
1430}
1431
1432impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1433    // The lowest-level API.
1434    /// Middle-level APIs:
1435    /// - [`StateTableInner::iter_with_prefix_inner`]
1436    /// - [`StateTableInner::iter_kv_with_pk_range`]
1437    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1438        &self,
1439        vnode: VirtualNode,
1440        (start, end): (Bound<Bytes>, Bound<Bytes>),
1441        prefix_hint: Option<Bytes>,
1442        prefetch_options: PrefetchOptions,
1443    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1444        if let Some(rows) = &self.all_rows {
1445            return Ok(futures::future::Either::Left(futures::stream::iter(
1446                rows.get(&vnode)
1447                    .expect("covered vnode")
1448                    .range((start, end))
1449                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1450            )));
1451        }
1452        let read_options = ReadOptions {
1453            prefix_hint,
1454            retention_seconds: self.table_option.retention_seconds,
1455            prefetch_options,
1456            cache_policy: CachePolicy::Fill(Hint::Normal),
1457        };
1458
1459        Ok(futures::future::Either::Right(
1460            deserialize_keyed_row_stream(
1461                self.state_store
1462                    .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1463                    .await?,
1464                &*self.row_serde,
1465            ),
1466        ))
1467    }
1468
1469    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1470        &self,
1471        vnode: VirtualNode,
1472        (start, end): (Bound<Bytes>, Bound<Bytes>),
1473        prefix_hint: Option<Bytes>,
1474        prefetch_options: PrefetchOptions,
1475    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1476        if let Some(rows) = &self.all_rows {
1477            return Ok(futures::future::Either::Left(futures::stream::iter(
1478                rows.get(&vnode)
1479                    .expect("covered vnode")
1480                    .range((start, end))
1481                    .rev()
1482                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1483            )));
1484        }
1485        let read_options = ReadOptions {
1486            prefix_hint,
1487            retention_seconds: self.table_option.retention_seconds,
1488            prefetch_options,
1489            cache_policy: CachePolicy::Fill(Hint::Normal),
1490        };
1491
1492        Ok(futures::future::Either::Right(
1493            deserialize_keyed_row_stream(
1494                self.state_store
1495                    .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1496                    .await?,
1497                &*self.row_serde,
1498            ),
1499        ))
1500    }
1501}
1502
1503impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1504where
1505    S: StateStore,
1506    SD: ValueRowSerde,
1507{
1508    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1509    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1510    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1511    pub async fn iter_with_prefix(
1512        &self,
1513        pk_prefix: impl Row,
1514        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1515        prefetch_options: PrefetchOptions,
1516    ) -> StreamExecutorResult<impl RowStream<'_>> {
1517        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1518            .await?;
1519        Ok(stream.map_ok(|(_, row)| row))
1520    }
1521
1522    /// Get the row from a state table with only 1 row.
1523    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1524        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1525        let stream = self
1526            .iter_with_prefix(row::empty(), sub_range, Default::default())
1527            .await?;
1528        pin_mut!(stream);
1529
1530        if let Some(res) = stream.next().await {
1531            let value = res?.into_owned_row();
1532            assert!(stream.next().await.is_none());
1533            Ok(Some(value))
1534        } else {
1535            Ok(None)
1536        }
1537    }
1538
1539    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1540    ///
1541    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1542    /// which does not matter in the use case.
1543    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1544        Ok(self
1545            .get_from_one_row_table()
1546            .await?
1547            .and_then(|row| row[0].clone()))
1548    }
1549
1550    pub async fn iter_keyed_row_with_prefix(
1551        &self,
1552        pk_prefix: impl Row,
1553        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1554        prefetch_options: PrefetchOptions,
1555    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1556        Ok(
1557            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1558                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1559        )
1560    }
1561
1562    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1563    pub async fn rev_iter_with_prefix(
1564        &self,
1565        pk_prefix: impl Row,
1566        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1567        prefetch_options: PrefetchOptions,
1568    ) -> StreamExecutorResult<impl RowStream<'_>> {
1569        Ok(
1570            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1571                .await?.map_ok(|(_, row)| row),
1572        )
1573    }
1574
1575    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1576        &self,
1577        pk_prefix: impl Row,
1578        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1579        prefetch_options: PrefetchOptions,
1580    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1581        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1582        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1583
1584        // We assume that all usages of iterating the state table only access a single vnode.
1585        // If this assertion fails, then something must be wrong with the operator implementation or
1586        // the distribution derivation from the optimizer.
1587        let vnode = self.compute_prefix_vnode(&pk_prefix);
1588
1589        // Construct prefix hint for prefix bloom filter.
1590        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1591        if self.prefix_hint_len != 0 {
1592            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1593        }
1594        let prefix_hint = {
1595            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1596                None
1597            } else {
1598                let encoded_prefix_len = self
1599                    .pk_serde
1600                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1601
1602                Some(Bytes::copy_from_slice(
1603                    &encoded_prefix[..encoded_prefix_len],
1604                ))
1605            }
1606        };
1607
1608        trace!(
1609            table_id = %self.table_id(),
1610            ?prefix_hint, ?pk_prefix,
1611            ?pk_prefix_indices,
1612            iter_direction = if REVERSE { "reverse" } else { "forward" },
1613            "storage_iter_with_prefix"
1614        );
1615
1616        let memcomparable_range =
1617            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1618
1619        Ok(if REVERSE {
1620            futures::future::Either::Left(
1621                self.row_store
1622                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1623                    .await?,
1624            )
1625        } else {
1626            futures::future::Either::Right(
1627                self.row_store
1628                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1629                    .await?,
1630            )
1631        })
1632    }
1633
1634    /// This function scans raw key-values from the relational table with specific `pk_range` under
1635    /// the same `vnode`.
1636    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1637        &'a self,
1638        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1639        // Optional vnode that returns an iterator only over the given range under that vnode.
1640        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1641        // iterate over each vnode that the `StateTableInner` owns.
1642        vnode: VirtualNode,
1643        prefetch_options: PrefetchOptions,
1644    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1645        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1646
1647        // TODO: provide a trace of useful params.
1648        self.row_store
1649            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1650            .await
1651    }
1652}
1653
1654fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1655    iter: impl StateStoreIter + 'a,
1656    deserializer: &'a impl ValueRowSerde,
1657) -> impl PkRowStream<'a, K> {
1658    iter.into_stream(move |(key, value)| {
1659        Ok((
1660            K::copy_from_slice(key.user_key.table_key.as_ref()),
1661            deserializer.deserialize(value).map(OwnedRow::new)?,
1662        ))
1663    })
1664    .map_err(Into::into)
1665}
1666
1667pub fn prefix_range_to_memcomparable(
1668    pk_serde: &OrderedRowSerde,
1669    range: &(Bound<impl Row>, Bound<impl Row>),
1670) -> (Bound<Bytes>, Bound<Bytes>) {
1671    (
1672        start_range_to_memcomparable(pk_serde, &range.0),
1673        end_range_to_memcomparable(pk_serde, &range.1, None),
1674    )
1675}
1676
1677fn prefix_and_sub_range_to_memcomparable(
1678    pk_serde: &OrderedRowSerde,
1679    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1680    pk_prefix: impl Row,
1681) -> (Bound<Bytes>, Bound<Bytes>) {
1682    let (range_start, range_end) = sub_range;
1683    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1684    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1685    let start_range = match range_start {
1686        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1687        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1688        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1689    };
1690    let end_range = match range_end {
1691        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1692        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1693        Unbounded => Unbounded,
1694    };
1695    (
1696        start_range_to_memcomparable(pk_serde, &start_range),
1697        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1698    )
1699}
1700
1701fn start_range_to_memcomparable<R: Row>(
1702    pk_serde: &OrderedRowSerde,
1703    bound: &Bound<R>,
1704) -> Bound<Bytes> {
1705    let serialize_pk_prefix = |pk_prefix: &R| {
1706        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1707        serialize_pk(pk_prefix, &prefix_serializer)
1708    };
1709    match bound {
1710        Unbounded => Unbounded,
1711        Included(r) => {
1712            let serialized = serialize_pk_prefix(r);
1713
1714            Included(serialized)
1715        }
1716        Excluded(r) => {
1717            let serialized = serialize_pk_prefix(r);
1718
1719            start_bound_of_excluded_prefix(&serialized)
1720        }
1721    }
1722}
1723
1724fn end_range_to_memcomparable<R: Row>(
1725    pk_serde: &OrderedRowSerde,
1726    bound: &Bound<R>,
1727    serialized_pk_prefix: Option<Bytes>,
1728) -> Bound<Bytes> {
1729    let serialize_pk_prefix = |pk_prefix: &R| {
1730        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1731        serialize_pk(pk_prefix, &prefix_serializer)
1732    };
1733    match bound {
1734        Unbounded => match serialized_pk_prefix {
1735            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1736            None => Unbounded,
1737        },
1738        Included(r) => {
1739            let serialized = serialize_pk_prefix(r);
1740
1741            end_bound_of_prefix(&serialized)
1742        }
1743        Excluded(r) => {
1744            let serialized = serialize_pk_prefix(r);
1745            Excluded(serialized)
1746        }
1747    }
1748}
1749
1750fn fill_non_output_indices(
1751    i2o_mapping: &ColIndexMapping,
1752    data_types: &[DataType],
1753    chunk: StreamChunk,
1754) -> StreamChunk {
1755    let cardinality = chunk.cardinality();
1756    let (ops, columns, vis) = chunk.into_inner();
1757    let mut full_columns = Vec::with_capacity(data_types.len());
1758    for (i, data_type) in data_types.iter().enumerate() {
1759        if let Some(j) = i2o_mapping.try_map(i) {
1760            full_columns.push(columns[j].clone());
1761        } else {
1762            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1763            column_builder.append_n_null(cardinality);
1764            let column: ArrayRef = column_builder.finish().into();
1765            full_columns.push(column)
1766        }
1767    }
1768    let data_chunk = DataChunk::new(full_columns, vis);
1769    StreamChunk::from_parts(ops, data_chunk)
1770}
1771
1772#[cfg(test)]
1773mod tests {
1774    use std::fmt::Debug;
1775
1776    use expect_test::{Expect, expect};
1777
1778    use super::*;
1779
1780    fn check(actual: impl Debug, expect: Expect) {
1781        let actual = format!("{:#?}", actual);
1782        expect.assert_eq(&actual);
1783    }
1784
1785    #[test]
1786    fn test_fill_non_output_indices() {
1787        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1788        let replicated_chunk = [OwnedRow::new(vec![
1789            Some(222_i32.into()),
1790            Some(2_i32.into()),
1791        ])];
1792        let replicated_chunk = StreamChunk::from_parts(
1793            vec![Op::Insert],
1794            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1795        );
1796        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1797        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1798        check(
1799            filled_chunk,
1800            expect![[r#"
1801            StreamChunk { cardinality: 1, capacity: 1, data:
1802            +---+---+---+-----+
1803            | + | 2 |   | 222 |
1804            +---+---+---+-----+
1805             }"#]],
1806        );
1807    }
1808}