risingwave_stream/common/table/
state_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::row::{self, OwnedRow, Row, RowExt};
39use risingwave_common::types::{DataType, ScalarImpl};
40use risingwave_common::util::column_index_mapping::ColIndexMapping;
41use risingwave_common::util::epoch::EpochPair;
42use risingwave_common::util::row_serde::OrderedRowSerde;
43use risingwave_common::util::sort_util::OrderType;
44use risingwave_common::util::value_encoding::BasicSerde;
45use risingwave_hummock_sdk::HummockReadEpoch;
46use risingwave_hummock_sdk::key::{
47    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
48    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
49};
50use risingwave_hummock_sdk::table_watermark::{
51    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
52};
53use risingwave_pb::catalog::Table;
54use risingwave_storage::StateStore;
55use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
56use risingwave_storage::hummock::CachePolicy;
57use risingwave_storage::mem_table::MemTableError;
58use risingwave_storage::row_serde::find_columns_by_ids;
59use risingwave_storage::row_serde::row_serde_util::{
60    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
61};
62use risingwave_storage::row_serde::value_serde::ValueRowSerde;
63use risingwave_storage::store::*;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::keyed_cache_may_stale;
69use crate::executor::monitor::streaming_stats::StateTableMetrics;
70use crate::executor::{StreamExecutorError, StreamExecutorResult};
71
72/// This macro is used to mark a point where we want to randomly discard the operation and early
73/// return, only in insane mode.
74macro_rules! insane_mode_discard_point {
75    () => {{
76        use rand::Rng;
77        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
78            return;
79        }
80    }};
81}
82
83/// Per-vnode statistics for pruning. None means this stat is not maintained.
84/// For each vnode, we maintain the min and max storage table key (excluding the vnode part) observed in the vnode.
85/// The stat won't differentiate between tombstone and normal keys.
86struct VnodeStatistics {
87    min_key: Option<Bytes>,
88    max_key: Option<Bytes>,
89}
90
91impl VnodeStatistics {
92    fn new() -> Self {
93        Self {
94            min_key: None,
95            max_key: None,
96        }
97    }
98
99    fn update_with_key(&mut self, key: &Bytes) {
100        if let Some(min) = &self.min_key {
101            if key < min {
102                self.min_key = Some(key.clone());
103            }
104        } else {
105            self.min_key = Some(key.clone());
106        }
107
108        if let Some(max) = &self.max_key {
109            if key > max {
110                self.max_key = Some(key.clone());
111            }
112        } else {
113            self.max_key = Some(key.clone());
114        }
115    }
116
117    fn can_prune(&self, key: &Bytes) -> bool {
118        if let Some(min) = &self.min_key
119            && key < min
120        {
121            return true;
122        }
123        if let Some(max) = &self.max_key
124            && key > max
125        {
126            return true;
127        }
128        false
129    }
130
131    fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
132        // Check if the range is completely outside vnode bounds
133        if let Some(max) = &self.max_key {
134            match start {
135                Included(s) if s > max => return true,
136                Excluded(s) if s >= max => return true,
137                _ => {}
138            }
139        }
140        if let Some(min) = &self.min_key {
141            match end {
142                Included(e) if e < min => return true,
143                Excluded(e) if e <= min => return true,
144                _ => {}
145            }
146        }
147        false
148    }
149
150    fn pruned_key_range(
151        &self,
152        start: &Bound<Bytes>,
153        end: &Bound<Bytes>,
154    ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
155        if self.can_prune_range(start, end) {
156            return None;
157        }
158        let new_start = if let Some(min) = &self.min_key {
159            match start {
160                Included(s) if s <= min => Included(min.clone()),
161                Excluded(s) if s < min => Included(min.clone()),
162                _ => start.clone(),
163            }
164        } else {
165            start.clone()
166        };
167
168        let new_end = if let Some(max) = &self.max_key {
169            match end {
170                Included(e) if e >= max => Included(max.clone()),
171                Excluded(e) if e > max => Included(max.clone()),
172                _ => end.clone(),
173            }
174        } else {
175            end.clone()
176        };
177
178        Some((new_start, new_end))
179    }
180}
181
182/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
183/// row-based encoding.
184pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
185where
186    S: StateStore,
187    SD: ValueRowSerde,
188{
189    /// Id for this table.
190    table_id: TableId,
191
192    /// State store backend.
193    row_store: StateTableRowStore<S::Local, SD>,
194
195    /// State store for accessing snapshot data
196    store: S,
197
198    /// Current epoch
199    epoch: Option<EpochPair>,
200
201    /// Used for serializing and deserializing the primary key.
202    pk_serde: OrderedRowSerde,
203
204    /// Indices of primary key.
205    /// Note that the index is based on the all columns of the table, instead of the output ones.
206    // FIXME: revisit constructions and usages.
207    pk_indices: Vec<usize>,
208
209    /// Distribution of the state table.
210    ///
211    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
212    /// executor. The table will also check whether the written rows
213    /// conform to this partition.
214    distribution: TableDistribution,
215
216    prefix_hint_len: usize,
217
218    value_indices: Option<Vec<usize>>,
219
220    /// The index of the watermark column used for state cleaning in all columns.
221    pub clean_watermark_index: Option<usize>,
222    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
223    pending_watermark: Option<ScalarImpl>,
224    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
225    committed_watermark: Option<ScalarImpl>,
226    /// Serializer and serde type for the watermark column.
227    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
228
229    /// Data Types
230    /// We will need to use to build data chunks from state table rows.
231    data_types: Vec<DataType>,
232
233    /// "i" here refers to the base `state_table`'s actual schema.
234    /// "o" here refers to the replicated state table's output schema.
235    /// This mapping is used to reconstruct a row being written from replicated state table.
236    /// Such that the schema of this row will match the full schema of the base state table.
237    /// It is only applicable for replication.
238    i2o_mapping: ColIndexMapping,
239
240    /// Output indices
241    /// Used for:
242    /// 1. Computing `output_value_indices` to ser/de replicated rows.
243    /// 2. Computing output pk indices to used them for backfill state.
244    pub output_indices: Vec<usize>,
245
246    op_consistency_level: StateTableOpConsistencyLevel,
247
248    /// Flag to indicate whether the state table has called `commit`, but has not called
249    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
250    on_post_commit: bool,
251}
252
253/// `StateTable` will use `BasicSerde` as default
254pub type StateTable<S> = StateTableInner<S, BasicSerde>;
255/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
256/// Used for `ArrangementBackfill` executor.
257pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
258
259// initialize
260impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
261where
262    S: StateStore,
263    SD: ValueRowSerde,
264{
265    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
266    /// and otherwise, deadlock can be likely to happen.
267    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
268        self.row_store
269            .init(epoch, self.distribution.vnodes())
270            .await?;
271        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
272        Ok(())
273    }
274
275    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
276        self.store
277            .try_wait_epoch(
278                HummockReadEpoch::Committed(prev_epoch),
279                TryWaitEpochOptions {
280                    table_id: self.table_id,
281                },
282            )
283            .await
284    }
285
286    pub fn state_store(&self) -> &S {
287        &self.store
288    }
289}
290
291fn consistent_old_value_op(
292    row_serde: Arc<impl ValueRowSerde>,
293    is_log_store: bool,
294) -> OpConsistencyLevel {
295    OpConsistencyLevel::ConsistentOldValue {
296        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
297            if first == second {
298                return true;
299            }
300            let first = match row_serde.deserialize(first) {
301                Ok(rows) => rows,
302                Err(e) => {
303                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
304                    return false;
305                }
306            };
307            let second = match row_serde.deserialize(second) {
308                Ok(rows) => rows,
309                Err(e) => {
310                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
311                    return false;
312                }
313            };
314            if first != second {
315                error!(first = ?first, second = ?second, "sanity check fail");
316                false
317            } else {
318                true
319            }
320        }),
321        is_log_store,
322    }
323}
324
325macro_rules! dispatch_value_indices {
326    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
327        if let Some(value_indices) = $value_indices {
328            $(
329                let $row_var_name = $row_var_name.project(value_indices);
330            )+
331            $body
332        } else {
333            $body
334        }
335    };
336}
337
338/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
339/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
340/// provides method to read (`get` and `iter`) over serialized key (or key range)
341/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
342struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
343    state_store: LS,
344    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
345
346    table_id: TableId,
347    row_serde: Arc<SD>,
348    // should be only used for debugging in panic message of handle_mem_table_error
349    pk_serde: OrderedRowSerde,
350
351    // Per-vnode min/max key statistics for pruning
352    vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
353    // Optional metrics for state table operations
354    pub metrics: Option<StateTableMetrics>,
355}
356
357impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
358    async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
359        if self.vnode_stats.is_none() {
360            return Ok(());
361        }
362
363        // vnode stats must be disabled when all rows are preloaded
364        assert!(self.all_rows.is_none());
365
366        let start_time = Instant::now();
367        let mut stats_map = HashMap::new();
368
369        // Scan each vnode to get min/max keys
370        for vnode in vnode_bitmap.iter_vnodes() {
371            let mut stats = VnodeStatistics::new();
372
373            // Get min key via forward iteration
374            let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
375            let read_options = ReadOptions {
376                cache_policy: CachePolicy::Fill(Hint::Low),
377                ..Default::default()
378            };
379
380            let mut iter = self
381                .state_store
382                .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
383                .await?;
384            if let Some(item) = iter.try_next().await? {
385                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
386                assert_eq!(vnode, key_vnode);
387                stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
388            }
389
390            // Get max key via reverse iteration
391            let mut rev_iter = self
392                .state_store
393                .rev_iter(memcomparable_range_with_vnode, read_options)
394                .await?;
395            if let Some(item) = rev_iter.try_next().await? {
396                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
397                assert_eq!(vnode, key_vnode);
398                stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
399            }
400
401            stats_map.insert(vnode, stats);
402        }
403
404        self.vnode_stats = Some(stats_map);
405
406        // avoid flooding e2e-test log
407        if !cfg!(debug_assertions) {
408            info!(
409                table_id = %self.table_id,
410                vnode_count = vnode_bitmap.count_ones(),
411                duration = ?start_time.elapsed(),
412                "finished initializing vnode statistics"
413            );
414        }
415
416        Ok(())
417    }
418
419    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
420        if let Some(rows) = &mut self.all_rows {
421            rows.clear();
422            let start_time = Instant::now();
423            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
424                let state_store = &self.state_store;
425                let row_serde = &self.row_serde;
426                async move {
427                    let mut rows = BTreeMap::new();
428                    let memcomparable_range_with_vnode =
429                        prefixed_range_with_vnode::<Bytes>(.., vnode);
430                    // TODO: set read options
431                    let stream = deserialize_keyed_row_stream::<Bytes>(
432                        state_store
433                            .iter(
434                                memcomparable_range_with_vnode,
435                                ReadOptions {
436                                    prefix_hint: None,
437                                    prefetch_options: Default::default(),
438                                    cache_policy: Default::default(),
439                                },
440                            )
441                            .await?,
442                        &**row_serde,
443                    );
444                    pin_mut!(stream);
445                    while let Some((encoded_key, row)) = stream.try_next().await? {
446                        let key = TableKey(encoded_key);
447                        let (iter_vnode, key) = key.split_vnode_bytes();
448                        assert_eq!(vnode, iter_vnode);
449                        rows.try_insert(key, row).expect("non-duplicated");
450                    }
451                    Ok((vnode, rows)) as StreamExecutorResult<_>
452                }
453            }))
454            .await?
455            .into_iter()
456            .collect();
457            // avoid flooding e2e-test log
458            if !cfg!(debug_assertions) {
459                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
460            }
461        }
462        Ok(())
463    }
464
465    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
466        self.state_store.init(InitOptions::new(epoch)).await?;
467        self.may_reload_all_rows(vnode_bitmap).await?;
468        self.may_load_vnode_stats(vnode_bitmap).await
469    }
470
471    async fn update_vnode_bitmap(
472        &mut self,
473        vnodes: Arc<Bitmap>,
474    ) -> StreamExecutorResult<Arc<Bitmap>> {
475        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
476        self.may_reload_all_rows(&vnodes).await?;
477        self.may_load_vnode_stats(&vnodes).await?;
478
479        Ok(prev_vnodes)
480    }
481
482    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
483        self.state_store.try_flush().await?;
484        Ok(())
485    }
486
487    async fn seal_current_epoch(
488        &mut self,
489        next_epoch: u64,
490        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
491        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
492    ) -> StreamExecutorResult<()> {
493        if let Some((direction, watermarks, serde_type)) = &table_watermarks
494            && let Some(rows) = &mut self.all_rows
495        {
496            match serde_type {
497                WatermarkSerdeType::PkPrefix => {
498                    for vnode_watermark in watermarks {
499                        match direction {
500                            WatermarkDirection::Ascending => {
501                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
502                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
503                                    // split_off returns everything after the given key, including the key.
504                                    *rows = rows.split_off(vnode_watermark.watermark());
505                                }
506                            }
507                            WatermarkDirection::Descending => {
508                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
509                                let split_off_key = next_key(vnode_watermark.watermark());
510                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
511                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
512                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
513                                    // (..Include(vnode_watermark.watermark()))
514                                    rows.split_off(split_off_key.as_slice());
515                                }
516                            }
517                        }
518                    }
519                }
520                WatermarkSerdeType::NonPkPrefix => {
521                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
522                    self.all_rows = None;
523                }
524                WatermarkSerdeType::Value => {
525                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
526                    self.all_rows = None;
527                }
528            }
529        }
530        self.state_store
531            .flush()
532            .instrument(tracing::info_span!("state_table_flush"))
533            .await?;
534        let switch_op_consistency_level =
535            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
536                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
537                StateTableOpConsistencyLevel::ConsistentOldValue => {
538                    consistent_old_value_op(self.row_serde.clone(), false)
539                }
540                StateTableOpConsistencyLevel::LogStoreEnabled => {
541                    consistent_old_value_op(self.row_serde.clone(), true)
542                }
543            });
544        self.state_store.seal_current_epoch(
545            next_epoch,
546            SealCurrentEpochOptions {
547                table_watermarks,
548                switch_op_consistency_level,
549            },
550        );
551        Ok(())
552    }
553}
554
555#[derive(Eq, PartialEq, Copy, Clone, Debug)]
556pub enum StateTableOpConsistencyLevel {
557    /// Op is inconsistent
558    Inconsistent,
559    /// Op is consistent.
560    /// - Insert op should ensure that the key does not exist previously
561    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
562    ConsistentOldValue,
563    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
564    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
565    LogStoreEnabled,
566}
567
568pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
569    table_catalog: &'a Table,
570    store: S,
571    vnodes: Option<Arc<Bitmap>>,
572    op_consistency_level: Option<StateTableOpConsistencyLevel>,
573    output_column_ids: Option<Vec<ColumnId>>,
574    preload_all_rows: PreloadAllRow,
575    enable_vnode_key_pruning: Option<bool>,
576    metrics: Option<StateTableMetrics>,
577
578    _serde: PhantomData<SD>,
579}
580
581impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
582    StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
583{
584    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
585        Self {
586            table_catalog,
587            store,
588            vnodes,
589            op_consistency_level: None,
590            output_column_ids: None,
591            preload_all_rows: (),
592            enable_vnode_key_pruning: None,
593            metrics: None,
594            _serde: Default::default(),
595        }
596    }
597
598    fn with_preload_all_rows(
599        self,
600        preload_all_rows: bool,
601    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
602        StateTableBuilder {
603            table_catalog: self.table_catalog,
604            store: self.store,
605            vnodes: self.vnodes,
606            op_consistency_level: self.op_consistency_level,
607            output_column_ids: self.output_column_ids,
608            preload_all_rows,
609            enable_vnode_key_pruning: self.enable_vnode_key_pruning,
610            metrics: self.metrics,
611            _serde: Default::default(),
612        }
613    }
614
615    pub fn enable_preload_all_rows_by_config(
616        self,
617        config: &StreamingConfig,
618    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
619        let developer = &config.developer;
620        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
621            !developer
622                .mem_preload_state_table_ids_blacklist
623                .contains(&self.table_catalog.id.as_raw_id())
624        } else {
625            developer
626                .mem_preload_state_table_ids_whitelist
627                .contains(&self.table_catalog.id.as_raw_id())
628        };
629        self.with_preload_all_rows(preload_all_rows)
630    }
631
632    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
633        self.with_preload_all_rows(false)
634    }
635}
636
637impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
638    StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
639{
640    pub fn with_op_consistency_level(
641        mut self,
642        op_consistency_level: StateTableOpConsistencyLevel,
643    ) -> Self {
644        self.op_consistency_level = Some(op_consistency_level);
645        self
646    }
647
648    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
649        self.output_column_ids = Some(output_column_ids);
650        self
651    }
652
653    pub fn enable_vnode_key_pruning(mut self, enable: bool) -> Self {
654        self.enable_vnode_key_pruning = Some(enable);
655        self
656    }
657
658    pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
659        self.metrics = Some(metrics);
660        self
661    }
662}
663
664impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
665    StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
666{
667    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
668        let mut preload_all_rows = self.preload_all_rows;
669        if preload_all_rows
670            && let Err(e) =
671                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
672        {
673            warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
674            preload_all_rows = false;
675        }
676
677        let should_enable_vnode_key_pruning = if preload_all_rows
678            && let Some(enable_vnode_key_pruning) = self.enable_vnode_key_pruning
679            && enable_vnode_key_pruning
680        {
681            false
682        } else {
683            self.enable_vnode_key_pruning.unwrap_or(false)
684        };
685
686        StateTableInner::from_table_catalog_inner(
687            self.table_catalog,
688            self.store,
689            self.vnodes,
690            self.op_consistency_level
691                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
692            self.output_column_ids.unwrap_or_default(),
693            preload_all_rows,
694            should_enable_vnode_key_pruning,
695            self.metrics,
696        )
697        .await
698    }
699}
700
701// initialize
702// FIXME(kwannoel): Enforce that none of the constructors here
703// should be used by replicated state table.
704// Apart from from_table_catalog_inner.
705impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
706where
707    S: StateStore,
708    SD: ValueRowSerde,
709{
710    /// Create state table from table catalog and store.
711    ///
712    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
713    #[cfg(any(test, feature = "test"))]
714    pub async fn from_table_catalog(
715        table_catalog: &Table,
716        store: S,
717        vnodes: Option<Arc<Bitmap>>,
718    ) -> Self {
719        StateTableBuilder::new(table_catalog, store, vnodes)
720            .forbid_preload_all_rows()
721            .build()
722            .await
723    }
724
725    /// Create state table from table catalog and store with sanity check disabled.
726    pub async fn from_table_catalog_inconsistent_op(
727        table_catalog: &Table,
728        store: S,
729        vnodes: Option<Arc<Bitmap>>,
730    ) -> Self {
731        StateTableBuilder::new(table_catalog, store, vnodes)
732            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
733            .forbid_preload_all_rows()
734            .build()
735            .await
736    }
737
738    /// Create state table from table catalog and store.
739    #[allow(clippy::too_many_arguments)]
740    async fn from_table_catalog_inner(
741        table_catalog: &Table,
742        store: S,
743        vnodes: Option<Arc<Bitmap>>,
744        op_consistency_level: StateTableOpConsistencyLevel,
745        output_column_ids: Vec<ColumnId>,
746        preload_all_rows: bool,
747        enable_vnode_key_pruning: bool,
748        metrics: Option<StateTableMetrics>,
749    ) -> Self {
750        let table_id = table_catalog.id;
751        let table_columns: Vec<ColumnDesc> = table_catalog
752            .columns
753            .iter()
754            .map(|col| col.column_desc.as_ref().unwrap().into())
755            .collect();
756        let data_types: Vec<DataType> = table_catalog
757            .columns
758            .iter()
759            .map(|col| {
760                col.get_column_desc()
761                    .unwrap()
762                    .get_column_type()
763                    .unwrap()
764                    .into()
765            })
766            .collect();
767        let order_types: Vec<OrderType> = table_catalog
768            .pk
769            .iter()
770            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
771            .collect();
772        let dist_key_indices: Vec<usize> = table_catalog
773            .distribution_key
774            .iter()
775            .map(|dist_index| *dist_index as usize)
776            .collect();
777
778        let pk_indices = table_catalog
779            .pk
780            .iter()
781            .map(|col_order| col_order.column_index as usize)
782            .collect_vec();
783
784        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
785        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
786            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
787        } else {
788            table_catalog
789                .get_dist_key_in_pk()
790                .iter()
791                .map(|idx| *idx as usize)
792                .collect()
793        };
794
795        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
796            let vnode_col_idx = *idx as usize;
797            pk_indices.iter().position(|&i| vnode_col_idx == i)
798        });
799
800        let distribution =
801            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
802        assert_eq!(
803            distribution.vnode_count(),
804            table_catalog.vnode_count(),
805            "vnode count mismatch, scanning table {} under wrong distribution?",
806            table_catalog.name,
807        );
808
809        let pk_data_types = pk_indices
810            .iter()
811            .map(|i| table_columns[*i].data_type.clone())
812            .collect();
813        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
814
815        let input_value_indices = table_catalog
816            .value_indices
817            .iter()
818            .map(|val| *val as usize)
819            .collect_vec();
820
821        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
822
823        // if value_indices is the no shuffle full columns.
824        let value_indices = match input_value_indices.len() == table_columns.len()
825            && input_value_indices == no_shuffle_value_indices
826        {
827            true => None,
828            false => Some(input_value_indices),
829        };
830        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
831
832        let row_serde = Arc::new(SD::new(
833            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
834            Arc::from(table_columns.clone().into_boxed_slice()),
835        ));
836
837        let state_table_op_consistency_level = op_consistency_level;
838        let op_consistency_level = match op_consistency_level {
839            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
840            StateTableOpConsistencyLevel::ConsistentOldValue => {
841                consistent_old_value_op(row_serde.clone(), false)
842            }
843            StateTableOpConsistencyLevel::LogStoreEnabled => {
844                consistent_old_value_op(row_serde.clone(), true)
845            }
846        };
847
848        let table_option = TableOption::new(table_catalog.retention_seconds);
849        let new_local_options = if IS_REPLICATED {
850            NewLocalOptions::new_replicated(
851                table_id,
852                op_consistency_level,
853                table_option,
854                distribution.vnodes().clone(),
855            )
856        } else {
857            NewLocalOptions::new(
858                table_id,
859                op_consistency_level,
860                table_option,
861                distribution.vnodes().clone(),
862                true,
863            )
864        };
865        let local_state_store = store.new_local(new_local_options).await;
866
867        // If state table has versioning, that means it supports
868        // Schema change. In that case, the row encoding should be column aware as well.
869        // Otherwise both will be false.
870        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
871        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
872        assert_eq!(
873            table_catalog.version.is_some(),
874            row_serde.kind().is_column_aware()
875        );
876
877        // Get info for replicated state table.
878        let output_column_ids_to_input_idx = output_column_ids
879            .iter()
880            .enumerate()
881            .map(|(pos, id)| (*id, pos))
882            .collect::<HashMap<_, _>>();
883
884        // Compute column descriptions
885        let columns: Vec<ColumnDesc> = table_catalog
886            .columns
887            .iter()
888            .map(|c| c.column_desc.as_ref().unwrap().into())
889            .collect_vec();
890
891        // Compute i2o mapping
892        // Note that this can be a partial mapping, since we use the i2o mapping to get
893        // any 1 of the output columns, and use that to fill the input column.
894        let mut i2o_mapping = vec![None; columns.len()];
895        for (i, column) in columns.iter().enumerate() {
896            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
897                i2o_mapping[i] = Some(*pos);
898            }
899        }
900        // We can prune any duplicate column indices
901        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
902
903        // Compute output indices
904        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
905        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
906        if clean_watermark_indices.len() > 1 {
907            unimplemented!("multiple clean watermark columns are not supported yet")
908        }
909        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
910        let watermark_serde = clean_watermark_index.map(|idx| {
911            let pk_idx = pk_indices.iter().position(|&i| i == idx);
912            let (watermark_serde, watermark_serde_type) = match pk_idx {
913                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
914                Some(pk_idx) => (
915                    pk_serde.index(pk_idx).into_owned(),
916                    WatermarkSerdeType::NonPkPrefix,
917                ),
918                None => (
919                    OrderedRowSerde::new(
920                        vec![data_types[idx].clone()],
921                        vec![OrderType::ascending()],
922                    ),
923                    WatermarkSerdeType::Value,
924                ),
925            };
926            (watermark_serde, watermark_serde_type)
927        });
928
929        // Restore persisted table watermark.
930        let max_watermark_of_vnodes = distribution
931            .vnodes()
932            .iter_vnodes()
933            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
934            .max();
935        let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref()
936            && let Some(max_watermark) = max_watermark_of_vnodes
937        {
938            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
939                assert!(row.len() == 1);
940                row[0].clone()
941            });
942            if deserialized.is_none() {
943                tracing::error!(
944                    vnodes = ?distribution.vnodes(),
945                    watermark = ?max_watermark,
946                    "Failed to deserialize persisted watermark from state store.",
947                );
948            }
949            deserialized
950        } else {
951            None
952        };
953
954        Self {
955            table_id,
956            row_store: StateTableRowStore {
957                all_rows: preload_all_rows.then(HashMap::new),
958                state_store: local_state_store,
959                row_serde,
960                pk_serde: pk_serde.clone(),
961                table_id,
962                // Need to maintain vnode min/max key stats when vnode key pruning is enabled
963                vnode_stats: enable_vnode_key_pruning.then(HashMap::new),
964                metrics,
965            },
966            store,
967            epoch: None,
968            pk_serde,
969            pk_indices,
970            distribution,
971            prefix_hint_len,
972            value_indices,
973            pending_watermark: None,
974            committed_watermark,
975            watermark_serde,
976            data_types,
977            output_indices,
978            i2o_mapping,
979            op_consistency_level: state_table_op_consistency_level,
980            clean_watermark_index,
981            on_post_commit: false,
982        }
983    }
984
985    pub fn get_data_types(&self) -> &[DataType] {
986        &self.data_types
987    }
988
989    pub fn table_id(&self) -> TableId {
990        self.table_id
991    }
992
993    /// Get the vnode value with given (prefix of) primary key
994    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
995        self.distribution
996            .try_compute_vnode_by_pk_prefix(pk_prefix)
997            .expect("For streaming, the given prefix must be enough to calculate the vnode")
998    }
999
1000    /// Get the vnode value of the given primary key
1001    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1002        self.distribution.compute_vnode_by_pk(pk)
1003    }
1004
1005    /// NOTE(kwannoel): This is used by backfill.
1006    /// We want to check pk indices of upstream table.
1007    pub fn pk_indices(&self) -> &[usize] {
1008        &self.pk_indices
1009    }
1010
1011    /// Get the indices of the primary key columns in the output columns.
1012    ///
1013    /// Returns `None` if any of the primary key columns is not in the output columns.
1014    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1015        assert!(IS_REPLICATED);
1016        self.pk_indices
1017            .iter()
1018            .map(|&i| self.output_indices.iter().position(|&j| i == j))
1019            .collect()
1020    }
1021
1022    pub fn pk_serde(&self) -> &OrderedRowSerde {
1023        &self.pk_serde
1024    }
1025
1026    pub fn vnodes(&self) -> &Arc<Bitmap> {
1027        self.distribution.vnodes()
1028    }
1029
1030    pub fn value_indices(&self) -> &Option<Vec<usize>> {
1031        &self.value_indices
1032    }
1033
1034    pub fn is_consistent_op(&self) -> bool {
1035        matches!(
1036            self.op_consistency_level,
1037            StateTableOpConsistencyLevel::ConsistentOldValue
1038                | StateTableOpConsistencyLevel::LogStoreEnabled
1039        )
1040    }
1041
1042    pub fn metrics(&self) -> Option<&StateTableMetrics> {
1043        self.row_store.metrics.as_ref()
1044    }
1045}
1046
1047impl<S, SD> StateTableInner<S, SD, true>
1048where
1049    S: StateStore,
1050    SD: ValueRowSerde,
1051{
1052    /// Create replicated state table from table catalog with output indices
1053    pub async fn new_replicated(
1054        table_catalog: &Table,
1055        store: S,
1056        vnodes: Option<Arc<Bitmap>>,
1057        output_column_ids: Vec<ColumnId>,
1058    ) -> Self {
1059        // TODO: can it be ConsistentOldValue?
1060        // TODO: may enable preload_all_rows
1061        StateTableBuilder::new(table_catalog, store, vnodes)
1062            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1063            .with_output_column_ids(output_column_ids)
1064            .forbid_preload_all_rows()
1065            .build()
1066            .await
1067    }
1068}
1069
1070// point get
1071impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1072where
1073    S: StateStore,
1074    SD: ValueRowSerde,
1075{
1076    /// Get a single row from state table.
1077    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1078        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1079        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1080        match row {
1081            Some(row) => {
1082                if IS_REPLICATED {
1083                    // If the table is replicated, we need to deserialize the row with the output
1084                    // indices.
1085                    let row = row.project(&self.output_indices);
1086                    Ok(Some(row.into_owned_row()))
1087                } else {
1088                    Ok(Some(row))
1089                }
1090            }
1091            None => Ok(None),
1092        }
1093    }
1094
1095    /// Get a raw encoded row from state table.
1096    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1097        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1098        self.row_store.exists(serialized_pk, prefix_hint).await
1099    }
1100
1101    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1102        assert!(pk.len() <= self.pk_indices.len());
1103        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1104    }
1105
1106    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1107        let serialized_pk = self.serialize_pk(&pk);
1108        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1109            Some(serialized_pk.slice(VirtualNode::SIZE..))
1110        } else {
1111            #[cfg(debug_assertions)]
1112            if self.prefix_hint_len != 0 {
1113                warn!(
1114                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1115                );
1116            }
1117            None
1118        };
1119        (serialized_pk, prefix_hint)
1120    }
1121}
1122
1123impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1124    async fn get(
1125        &self,
1126        key_bytes: TableKey<Bytes>,
1127        prefix_hint: Option<Bytes>,
1128    ) -> StreamExecutorResult<Option<OwnedRow>> {
1129        if let Some(m) = &self.metrics {
1130            m.get_count.inc();
1131        }
1132        if let Some(rows) = &self.all_rows {
1133            let (vnode, key) = key_bytes.split_vnode_bytes();
1134            return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1135        }
1136
1137        // Try to prune using vnode statistics
1138        if let Some(stats) = &self.vnode_stats
1139            && let (vnode, key) = key_bytes.split_vnode_bytes()
1140            && let Some(vnode_stat) = stats.get(&vnode)
1141            && vnode_stat.can_prune(&key)
1142        {
1143            if let Some(m) = &self.metrics {
1144                m.get_vnode_pruned_count.inc();
1145            }
1146            return Ok(None);
1147        }
1148
1149        let read_options = ReadOptions {
1150            prefix_hint,
1151            cache_policy: CachePolicy::Fill(Hint::Normal),
1152            ..Default::default()
1153        };
1154
1155        self.state_store
1156            .on_key_value(key_bytes, read_options, move |_, value| {
1157                let row = self.row_serde.deserialize(value)?;
1158                Ok(OwnedRow::new(row))
1159            })
1160            .await
1161            .map_err(Into::into)
1162    }
1163
1164    async fn exists(
1165        &self,
1166        key_bytes: TableKey<Bytes>,
1167        prefix_hint: Option<Bytes>,
1168    ) -> StreamExecutorResult<bool> {
1169        if let Some(m) = &self.metrics {
1170            m.get_count.inc();
1171        }
1172        if let Some(rows) = &self.all_rows {
1173            let (vnode, key) = key_bytes.split_vnode_bytes();
1174            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1175        }
1176
1177        // Try to prune using vnode statistics
1178        if let Some(stats) = &self.vnode_stats
1179            && let (vnode, key) = key_bytes.split_vnode_bytes()
1180            && let Some(vnode_stat) = stats.get(&vnode)
1181            && vnode_stat.can_prune(&key)
1182        {
1183            if let Some(m) = &self.metrics {
1184                m.get_vnode_pruned_count.inc();
1185            }
1186            return Ok(false);
1187        }
1188
1189        let read_options = ReadOptions {
1190            prefix_hint,
1191            cache_policy: CachePolicy::Fill(Hint::Normal),
1192            ..Default::default()
1193        };
1194        let result = self
1195            .state_store
1196            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1197            .await?;
1198        Ok(result.is_some())
1199    }
1200}
1201
1202/// A callback struct returned from [`StateTableInner::commit`].
1203///
1204/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1205/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1206/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1207///
1208/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1209/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1210/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1211/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1212/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1213/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1214/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1215/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1216#[must_use]
1217pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1218where
1219    S: StateStore,
1220    SD: ValueRowSerde,
1221{
1222    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1223}
1224
1225impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1226where
1227    S: StateStore,
1228    SD: ValueRowSerde,
1229{
1230    /// Returns `Some((new_vnodes, old_vnodes, state_table), keyed_cache_may_stale)` if the vnode bitmap is updated.
1231    ///
1232    /// Note the `keyed_cache_may_stale` only applies to keyed cache. If the executor's cache is not keyed, but will
1233    /// be consumed with all vnodes it owns, the executor may need to ALWAYS clear the cache regardless of this flag.
1234    pub async fn post_yield_barrier(
1235        mut self,
1236        new_vnodes: Option<Arc<Bitmap>>,
1237    ) -> StreamExecutorResult<
1238        Option<(
1239            (
1240                Arc<Bitmap>,
1241                Arc<Bitmap>,
1242                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1243            ),
1244            bool,
1245        )>,
1246    > {
1247        self.inner.on_post_commit = false;
1248        Ok(if let Some(new_vnodes) = new_vnodes {
1249            let (old_vnodes, keyed_cache_may_stale) =
1250                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1251            Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1252        } else {
1253            None
1254        })
1255    }
1256
1257    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1258        &*self.inner
1259    }
1260
1261    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1262    async fn update_vnode_bitmap(
1263        &mut self,
1264        new_vnodes: Arc<Bitmap>,
1265    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1266        let prev_vnodes = self
1267            .inner
1268            .row_store
1269            .update_vnode_bitmap(new_vnodes.clone())
1270            .await?;
1271        assert_eq!(
1272            &prev_vnodes,
1273            self.inner.vnodes(),
1274            "state table and state store vnode bitmap mismatches"
1275        );
1276
1277        if self.inner.distribution.is_singleton() {
1278            assert_eq!(
1279                &new_vnodes,
1280                self.inner.vnodes(),
1281                "should not update vnode bitmap for singleton table"
1282            );
1283        }
1284        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1285
1286        let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1287
1288        if keyed_cache_may_stale {
1289            self.inner.pending_watermark = None;
1290        }
1291
1292        Ok((
1293            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1294            keyed_cache_may_stale,
1295        ))
1296    }
1297}
1298
1299// write
1300impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1301    fn handle_mem_table_error(&self, e: StorageError) {
1302        let e = match e.into_inner() {
1303            ErrorKind::MemTable(e) => e,
1304            _ => unreachable!("should only get memtable error"),
1305        };
1306        match *e {
1307            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1308                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1309                panic!(
1310                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1311                    self.table_id,
1312                    vnode,
1313                    &key,
1314                    prev.debug_fmt(&*self.row_serde),
1315                    new.debug_fmt(&*self.row_serde),
1316                )
1317            }
1318        }
1319    }
1320
1321    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1322        insane_mode_discard_point!();
1323        let value_bytes = self.row_serde.serialize(&value).into();
1324
1325        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1326
1327        // Update vnode statistics (skip if all_rows is present)
1328        if self.all_rows.is_none()
1329            && let Some(stats) = &mut self.vnode_stats
1330            && let Some(vnode_stat) = stats.get_mut(&vnode)
1331        {
1332            vnode_stat.update_with_key(&key_without_vnode);
1333        }
1334
1335        if let Some(rows) = &mut self.all_rows {
1336            rows.get_mut(&vnode)
1337                .expect("covered vnode")
1338                .insert(key_without_vnode, value.into_owned_row());
1339        }
1340        self.state_store
1341            .insert(key, value_bytes, None)
1342            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1343    }
1344
1345    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1346        insane_mode_discard_point!();
1347        let value_bytes = self.row_serde.serialize(value).into();
1348
1349        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1350
1351        // Clear vnode statistics on delete (conservative approach, skip if all_rows is present)
1352        // The deleted key might be the min or max, so we invalidate the stats
1353        if self.all_rows.is_none()
1354            && let Some(stats) = &mut self.vnode_stats
1355            && let Some(vnode_stat) = stats.get_mut(&vnode)
1356        {
1357            vnode_stat.update_with_key(&key_without_vnode);
1358        }
1359
1360        if let Some(rows) = &mut self.all_rows {
1361            rows.get_mut(&vnode)
1362                .expect("covered vnode")
1363                .remove(&key_without_vnode);
1364        }
1365        self.state_store
1366            .delete(key, value_bytes)
1367            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1368    }
1369
1370    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1371        insane_mode_discard_point!();
1372        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1373        let old_value_bytes = self.row_serde.serialize(old_value).into();
1374
1375        let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1376
1377        // Update does not change the key, so statistics remain valid (skip if all_rows is present)
1378        // But we update to ensure consistency
1379        if self.all_rows.is_none()
1380            && let Some(stats) = &mut self.vnode_stats
1381            && let Some(vnode_stat) = stats.get_mut(&vnode)
1382        {
1383            vnode_stat.update_with_key(&key_without_vnode);
1384        }
1385
1386        if let Some(rows) = &mut self.all_rows {
1387            rows.get_mut(&vnode)
1388                .expect("covered vnode")
1389                .insert(key_without_vnode, new_value.into_owned_row());
1390        }
1391        self.state_store
1392            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1393            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1394    }
1395}
1396
1397impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1398where
1399    S: StateStore,
1400    SD: ValueRowSerde,
1401{
1402    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1403    /// the table.
1404    pub fn insert(&mut self, value: impl Row) {
1405        let pk_indices = &self.pk_indices;
1406        let pk = (&value).project(pk_indices);
1407
1408        let key_bytes = self.serialize_pk(&pk);
1409        dispatch_value_indices!(&self.value_indices, [value], {
1410            self.row_store.insert(key_bytes, value)
1411        })
1412    }
1413
1414    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1415    /// column desc of the table.
1416    pub fn delete(&mut self, old_value: impl Row) {
1417        let pk_indices = &self.pk_indices;
1418        let pk = (&old_value).project(pk_indices);
1419
1420        let key_bytes = self.serialize_pk(&pk);
1421        dispatch_value_indices!(&self.value_indices, [old_value], {
1422            self.row_store.delete(key_bytes, old_value)
1423        })
1424    }
1425
1426    /// Update a row. The old and new value should have the same pk.
1427    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1428        let old_pk = (&old_value).project(self.pk_indices());
1429        let new_pk = (&new_value).project(self.pk_indices());
1430        debug_assert!(
1431            Row::eq(&old_pk, new_pk),
1432            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1433            self.table_id
1434        );
1435
1436        let key_bytes = self.serialize_pk(&new_pk);
1437        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1438            self.row_store.update(key_bytes, old_value, new_value)
1439        })
1440    }
1441
1442    /// Write a record into state table. Must have the same schema with the table.
1443    pub fn write_record(&mut self, record: Record<impl Row>) {
1444        match record {
1445            Record::Insert { new_row } => self.insert(new_row),
1446            Record::Delete { old_row } => self.delete(old_row),
1447            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1448        }
1449    }
1450
1451    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1452        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1453    }
1454
1455    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1456    // allow(izip, which use zip instead of zip_eq)
1457    #[allow(clippy::disallowed_methods)]
1458    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1459        let chunk = if IS_REPLICATED {
1460            self.fill_non_output_indices(chunk)
1461        } else {
1462            chunk
1463        };
1464
1465        let vnodes = self
1466            .distribution
1467            .compute_chunk_vnode(&chunk, &self.pk_indices);
1468
1469        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1470            let Some((op, row)) = optional_row else {
1471                continue;
1472            };
1473            let pk = row.project(&self.pk_indices);
1474            let vnode = vnodes[idx];
1475            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1476            match op {
1477                Op::Insert | Op::UpdateInsert => {
1478                    dispatch_value_indices!(&self.value_indices, [row], {
1479                        self.row_store.insert(key_bytes, row);
1480                    });
1481                }
1482                Op::Delete | Op::UpdateDelete => {
1483                    dispatch_value_indices!(&self.value_indices, [row], {
1484                        self.row_store.delete(key_bytes, row);
1485                    });
1486                }
1487            }
1488        }
1489    }
1490
1491    /// Update watermark for state cleaning.
1492    ///
1493    /// # Arguments
1494    ///
1495    /// * `watermark` - Latest watermark received.
1496    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1497        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1498        self.pending_watermark = Some(watermark);
1499    }
1500
1501    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1502    /// table through `update_watermark` method.
1503    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1504        self.committed_watermark.as_ref()
1505    }
1506
1507    pub async fn commit(
1508        &mut self,
1509        new_epoch: EpochPair,
1510    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1511        self.commit_inner(new_epoch, None).await
1512    }
1513
1514    #[cfg(test)]
1515    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1516        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1517    }
1518
1519    pub async fn commit_assert_no_update_vnode_bitmap(
1520        &mut self,
1521        new_epoch: EpochPair,
1522    ) -> StreamExecutorResult<()> {
1523        let post_commit = self.commit_inner(new_epoch, None).await?;
1524        post_commit.post_yield_barrier(None).await?;
1525        Ok(())
1526    }
1527
1528    pub async fn commit_may_switch_consistent_op(
1529        &mut self,
1530        new_epoch: EpochPair,
1531        op_consistency_level: StateTableOpConsistencyLevel,
1532    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1533        if self.op_consistency_level != op_consistency_level {
1534            // avoid flooding e2e-test log
1535            if !cfg!(debug_assertions) {
1536                info!(
1537                    ?new_epoch,
1538                    prev_op_consistency_level = ?self.op_consistency_level,
1539                    ?op_consistency_level,
1540                    table_id = %self.table_id,
1541                    "switch to new op consistency level"
1542                );
1543            }
1544            self.commit_inner(new_epoch, Some(op_consistency_level))
1545                .await
1546        } else {
1547            self.commit_inner(new_epoch, None).await
1548        }
1549    }
1550
1551    async fn commit_inner(
1552        &mut self,
1553        new_epoch: EpochPair,
1554        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1555    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1556        assert!(!self.on_post_commit);
1557        assert_eq!(
1558            self.epoch.expect("should only be called after init").curr,
1559            new_epoch.prev
1560        );
1561        if let Some(new_consistency_level) = switch_consistent_op {
1562            assert_ne!(self.op_consistency_level, new_consistency_level);
1563            self.op_consistency_level = new_consistency_level;
1564        }
1565        trace!(
1566            table_id = %self.table_id,
1567            epoch = ?self.epoch,
1568            "commit state table"
1569        );
1570
1571        let table_watermarks = self.commit_pending_watermark();
1572        self.row_store
1573            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1574            .await?;
1575        self.epoch = Some(new_epoch);
1576
1577        self.on_post_commit = true;
1578        Ok(StateTablePostCommit { inner: self })
1579    }
1580
1581    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1582    fn commit_pending_watermark(
1583        &mut self,
1584    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1585        let watermark = self.pending_watermark.take()?;
1586        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1587
1588        assert!(
1589            !self.pk_indices().is_empty(),
1590            "see pending watermark on empty pk"
1591        );
1592        let (watermark_serializer, watermark_type) = self
1593            .watermark_serde
1594            .as_ref()
1595            .expect("watermark serde should be initialized to commit watermark");
1596        let watermark_suffix =
1597            serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1598        let vnode_watermark = VnodeWatermark::new(
1599            self.vnodes().clone(),
1600            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1601        );
1602        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1603
1604        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1605        let direction = if order_type.is_ascending() {
1606            WatermarkDirection::Ascending
1607        } else {
1608            WatermarkDirection::Descending
1609        };
1610
1611        self.committed_watermark = Some(watermark);
1612        Some((direction, vec![vnode_watermark], *watermark_type))
1613    }
1614
1615    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1616        self.row_store.try_flush().await?;
1617        Ok(())
1618    }
1619}
1620
1621// Manually expand trait alias for better IDE experience.
1622pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1623impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1624
1625pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1626impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1627
1628pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1629impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1630
1631pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1632
1633pub trait FromVnodeBytes {
1634    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1635}
1636
1637impl FromVnodeBytes for Bytes {
1638    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1639        prefix_slice_with_vnode(vnode, bytes)
1640    }
1641}
1642
1643impl FromVnodeBytes for () {
1644    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1645}
1646
1647// Iterator functions
1648impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1649where
1650    S: StateStore,
1651    SD: ValueRowSerde,
1652{
1653    /// This function scans rows from the relational table with specific `pk_range` under the same
1654    /// `vnode`.
1655    pub async fn iter_with_vnode(
1656        &self,
1657
1658        // Optional vnode that returns an iterator only over the given range under that vnode.
1659        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1660        // iterate over each vnode that the `StateTableInner` owns.
1661        vnode: VirtualNode,
1662        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1663        prefetch_options: PrefetchOptions,
1664    ) -> StreamExecutorResult<impl RowStream<'_>> {
1665        Ok(self
1666            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1667            .await?
1668            .map_ok(|(_, row)| row))
1669    }
1670
1671    pub async fn iter_keyed_row_with_vnode(
1672        &self,
1673        vnode: VirtualNode,
1674        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1675        prefetch_options: PrefetchOptions,
1676    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1677        Ok(self
1678            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1679            .await?
1680            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1681    }
1682
1683    pub async fn iter_with_vnode_and_output_indices(
1684        &self,
1685        vnode: VirtualNode,
1686        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1687        prefetch_options: PrefetchOptions,
1688    ) -> StreamExecutorResult<impl RowStream<'_>> {
1689        assert!(IS_REPLICATED);
1690        let stream = self
1691            .iter_with_vnode(vnode, pk_range, prefetch_options)
1692            .await?;
1693        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1694    }
1695}
1696
1697impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1698    // The lowest-level API.
1699    /// Middle-level APIs:
1700    /// - [`StateTableInner::iter_with_prefix_inner`]
1701    /// - [`StateTableInner::iter_kv_with_pk_range`]
1702    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1703        &self,
1704        vnode: VirtualNode,
1705        (start, end): (Bound<Bytes>, Bound<Bytes>),
1706        prefix_hint: Option<Bytes>,
1707        prefetch_options: PrefetchOptions,
1708    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1709        if let Some(m) = &self.metrics {
1710            m.iter_count.inc();
1711        }
1712        // Check if we can prune the entire range using vnode statistics
1713        let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1714            && let Some(vnode_stat) = stats.get(&vnode)
1715        {
1716            match vnode_stat.pruned_key_range(&start, &end) {
1717                Some((new_start, new_end)) => (new_start, new_end),
1718                None => {
1719                    if let Some(m) = &self.metrics {
1720                        m.iter_vnode_pruned_count.inc();
1721                    }
1722                    return Ok(futures::future::Either::Left(futures::stream::empty()));
1723                }
1724            }
1725        } else {
1726            (start, end)
1727        };
1728
1729        if let Some(rows) = &self.all_rows {
1730            return Ok(futures::future::Either::Right(
1731                futures::future::Either::Left(futures::stream::iter(
1732                    rows.get(&vnode)
1733                        .expect("covered vnode")
1734                        .range((pruned_start, pruned_end))
1735                        .map(move |(key, value)| {
1736                            Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1737                        }),
1738                )),
1739            ));
1740        }
1741        let read_options = ReadOptions {
1742            prefix_hint,
1743            prefetch_options,
1744            cache_policy: CachePolicy::Fill(Hint::Normal),
1745        };
1746
1747        Ok(futures::future::Either::Right(
1748            futures::future::Either::Right(deserialize_keyed_row_stream(
1749                self.state_store
1750                    .iter(
1751                        prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1752                        read_options,
1753                    )
1754                    .await?,
1755                &*self.row_serde,
1756            )),
1757        ))
1758    }
1759
1760    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1761        &self,
1762        vnode: VirtualNode,
1763        (start, end): (Bound<Bytes>, Bound<Bytes>),
1764        prefix_hint: Option<Bytes>,
1765        prefetch_options: PrefetchOptions,
1766    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1767        if let Some(m) = &self.metrics {
1768            m.iter_count.inc();
1769        }
1770        // Check if we can prune the entire range using vnode statistics
1771        let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1772            && let Some(vnode_stat) = stats.get(&vnode)
1773        {
1774            match vnode_stat.pruned_key_range(&start, &end) {
1775                Some((new_start, new_end)) => (new_start, new_end),
1776                None => {
1777                    if let Some(m) = &self.metrics {
1778                        m.iter_vnode_pruned_count.inc();
1779                    }
1780                    return Ok(futures::future::Either::Left(futures::stream::empty()));
1781                }
1782            }
1783        } else {
1784            (start, end)
1785        };
1786
1787        if let Some(rows) = &self.all_rows {
1788            return Ok(futures::future::Either::Right(
1789                futures::future::Either::Left(futures::stream::iter(
1790                    rows.get(&vnode)
1791                        .expect("covered vnode")
1792                        .range((pruned_start, pruned_end))
1793                        .rev()
1794                        .map(move |(key, value)| {
1795                            Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1796                        }),
1797                )),
1798            ));
1799        }
1800        let read_options = ReadOptions {
1801            prefix_hint,
1802            prefetch_options,
1803            cache_policy: CachePolicy::Fill(Hint::Normal),
1804        };
1805
1806        Ok(futures::future::Either::Right(
1807            futures::future::Either::Right(deserialize_keyed_row_stream(
1808                self.state_store
1809                    .rev_iter(
1810                        prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1811                        read_options,
1812                    )
1813                    .await?,
1814                &*self.row_serde,
1815            )),
1816        ))
1817    }
1818}
1819
1820impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1821where
1822    S: StateStore,
1823    SD: ValueRowSerde,
1824{
1825    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1826    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1827    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1828    pub async fn iter_with_prefix(
1829        &self,
1830        pk_prefix: impl Row,
1831        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1832        prefetch_options: PrefetchOptions,
1833    ) -> StreamExecutorResult<impl RowStream<'_>> {
1834        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1835            .await?;
1836        Ok(stream.map_ok(|(_, row)| row))
1837    }
1838
1839    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1840    /// `vnode`, and filters out rows based on watermarks. It calls `iter_with_prefix` and further filters rows
1841    /// based on the table watermark retrieved from the state store.
1842    ///
1843    /// The caller must ensure that `clean_watermark_index` is set before calling this method, otherwise it will return all rows without filtering.
1844    pub async fn iter_with_prefix_respecting_watermark(
1845        &self,
1846        pk_prefix: impl Row,
1847        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1848        prefetch_options: PrefetchOptions,
1849    ) -> StreamExecutorResult<BoxedRowStream<'_>> {
1850        let vnode = self.compute_prefix_vnode(&pk_prefix);
1851        let stream = self
1852            .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
1853            .await?;
1854        let Some(clean_watermark_index) = self.clean_watermark_index else {
1855            return Ok(stream.boxed());
1856        };
1857        let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
1858            return Err(StreamExecutorError::from(anyhow!(
1859                "Missing watermark serde"
1860            )));
1861        };
1862        // Fast path. TableWatermarksIndex::rewrite_range_with_table_watermark has already filtered the rows.
1863        if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
1864            return Ok(stream.boxed());
1865        }
1866        let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
1867        let Some(watermark_bytes) = watermark_bytes else {
1868            return Ok(stream.boxed());
1869        };
1870        let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
1871        if watermark_row.len() != 1 {
1872            return Err(StreamExecutorError::from(format!(
1873                "Watermark row should have exactly 1 column, got {}",
1874                watermark_row.len()
1875            )));
1876        }
1877        let watermark_value = watermark_row[0].clone();
1878        // StateTableInner::update_watermark should ensure that the watermark is not NULL
1879        if watermark_value.is_none() {
1880            return Err(StreamExecutorError::from(anyhow!(
1881                "Watermark cannot be NULL"
1882            )));
1883        }
1884        let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
1885            StreamExecutorError::from(anyhow!(
1886                "Watermark serde should have at least one order type"
1887            ))
1888        })?;
1889        let direction = if order_type.is_ascending() {
1890            WatermarkDirection::Ascending
1891        } else {
1892            WatermarkDirection::Descending
1893        };
1894        let stream = stream.try_filter_map(move |row| {
1895            let watermark_col_value = row.datum_at(clean_watermark_index);
1896            let should_filter = direction.datum_filter_by_watermark(
1897                watermark_col_value,
1898                &watermark_value,
1899                *order_type,
1900            );
1901            if should_filter {
1902                ready(Ok(None))
1903            } else {
1904                ready(Ok(Some(row)))
1905            }
1906        });
1907        Ok(stream.boxed())
1908    }
1909
1910    /// Get the row from a state table with only 1 row.
1911    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1912        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1913        let stream = self
1914            .iter_with_prefix(row::empty(), sub_range, Default::default())
1915            .await?;
1916        pin_mut!(stream);
1917
1918        if let Some(res) = stream.next().await {
1919            let value = res?.into_owned_row();
1920            assert!(stream.next().await.is_none());
1921            Ok(Some(value))
1922        } else {
1923            Ok(None)
1924        }
1925    }
1926
1927    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1928    ///
1929    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1930    /// which does not matter in the use case.
1931    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1932        Ok(self
1933            .get_from_one_row_table()
1934            .await?
1935            .and_then(|row| row[0].clone()))
1936    }
1937
1938    pub async fn iter_keyed_row_with_prefix(
1939        &self,
1940        pk_prefix: impl Row,
1941        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1942        prefetch_options: PrefetchOptions,
1943    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1944        Ok(
1945            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1946                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1947        )
1948    }
1949
1950    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1951    pub async fn rev_iter_with_prefix(
1952        &self,
1953        pk_prefix: impl Row,
1954        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1955        prefetch_options: PrefetchOptions,
1956    ) -> StreamExecutorResult<impl RowStream<'_>> {
1957        Ok(
1958            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1959                .await?.map_ok(|(_, row)| row),
1960        )
1961    }
1962
1963    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1964        &self,
1965        pk_prefix: impl Row,
1966        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1967        prefetch_options: PrefetchOptions,
1968    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1969        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1970        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1971
1972        // We assume that all usages of iterating the state table only access a single vnode.
1973        // If this assertion fails, then something must be wrong with the operator implementation or
1974        // the distribution derivation from the optimizer.
1975        let vnode = self.compute_prefix_vnode(&pk_prefix);
1976
1977        // Construct prefix hint for prefix bloom filter.
1978        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1979        if self.prefix_hint_len != 0 {
1980            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1981        }
1982        let prefix_hint = {
1983            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1984                None
1985            } else {
1986                let encoded_prefix_len = self
1987                    .pk_serde
1988                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1989
1990                Some(Bytes::copy_from_slice(
1991                    &encoded_prefix[..encoded_prefix_len],
1992                ))
1993            }
1994        };
1995
1996        trace!(
1997            table_id = %self.table_id(),
1998            ?prefix_hint, ?pk_prefix,
1999            ?pk_prefix_indices,
2000            iter_direction = if REVERSE { "reverse" } else { "forward" },
2001            "storage_iter_with_prefix"
2002        );
2003
2004        let memcomparable_range =
2005            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2006
2007        Ok(if REVERSE {
2008            futures::future::Either::Left(
2009                self.row_store
2010                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2011                    .await?,
2012            )
2013        } else {
2014            futures::future::Either::Right(
2015                self.row_store
2016                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2017                    .await?,
2018            )
2019        })
2020    }
2021
2022    /// This function scans raw key-values from the relational table with specific `pk_range` under
2023    /// the same `vnode`.
2024    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2025        &'a self,
2026        pk_range: &(Bound<impl Row>, Bound<impl Row>),
2027        // Optional vnode that returns an iterator only over the given range under that vnode.
2028        // For now, we require this parameter, and will panic. In the future, when `None`, we can
2029        // iterate over each vnode that the `StateTableInner` owns.
2030        vnode: VirtualNode,
2031        prefetch_options: PrefetchOptions,
2032    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2033        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2034
2035        // TODO: provide a trace of useful params.
2036        self.row_store
2037            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2038            .await
2039    }
2040}
2041
2042fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2043    iter: impl StateStoreIter + 'a,
2044    deserializer: &'a impl ValueRowSerde,
2045) -> impl PkRowStream<'a, K> {
2046    iter.into_stream(move |(key, value)| {
2047        Ok((
2048            K::copy_from_slice(key.user_key.table_key.as_ref()),
2049            deserializer.deserialize(value).map(OwnedRow::new)?,
2050        ))
2051    })
2052    .map_err(Into::into)
2053}
2054
2055pub fn prefix_range_to_memcomparable(
2056    pk_serde: &OrderedRowSerde,
2057    range: &(Bound<impl Row>, Bound<impl Row>),
2058) -> (Bound<Bytes>, Bound<Bytes>) {
2059    (
2060        start_range_to_memcomparable(pk_serde, &range.0),
2061        end_range_to_memcomparable(pk_serde, &range.1, None),
2062    )
2063}
2064
2065fn prefix_and_sub_range_to_memcomparable(
2066    pk_serde: &OrderedRowSerde,
2067    sub_range: &(Bound<impl Row>, Bound<impl Row>),
2068    pk_prefix: impl Row,
2069) -> (Bound<Bytes>, Bound<Bytes>) {
2070    let (range_start, range_end) = sub_range;
2071    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2072    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2073    let start_range = match range_start {
2074        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2075        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2076        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2077    };
2078    let end_range = match range_end {
2079        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2080        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2081        Unbounded => Unbounded,
2082    };
2083    (
2084        start_range_to_memcomparable(pk_serde, &start_range),
2085        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2086    )
2087}
2088
2089fn start_range_to_memcomparable<R: Row>(
2090    pk_serde: &OrderedRowSerde,
2091    bound: &Bound<R>,
2092) -> Bound<Bytes> {
2093    let serialize_pk_prefix = |pk_prefix: &R| {
2094        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2095        serialize_pk(pk_prefix, &prefix_serializer)
2096    };
2097    match bound {
2098        Unbounded => Unbounded,
2099        Included(r) => {
2100            let serialized = serialize_pk_prefix(r);
2101
2102            Included(serialized)
2103        }
2104        Excluded(r) => {
2105            let serialized = serialize_pk_prefix(r);
2106
2107            start_bound_of_excluded_prefix(&serialized)
2108        }
2109    }
2110}
2111
2112fn end_range_to_memcomparable<R: Row>(
2113    pk_serde: &OrderedRowSerde,
2114    bound: &Bound<R>,
2115    serialized_pk_prefix: Option<Bytes>,
2116) -> Bound<Bytes> {
2117    let serialize_pk_prefix = |pk_prefix: &R| {
2118        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2119        serialize_pk(pk_prefix, &prefix_serializer)
2120    };
2121    match bound {
2122        Unbounded => match serialized_pk_prefix {
2123            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2124            None => Unbounded,
2125        },
2126        Included(r) => {
2127            let serialized = serialize_pk_prefix(r);
2128
2129            end_bound_of_prefix(&serialized)
2130        }
2131        Excluded(r) => {
2132            let serialized = serialize_pk_prefix(r);
2133            Excluded(serialized)
2134        }
2135    }
2136}
2137
2138fn fill_non_output_indices(
2139    i2o_mapping: &ColIndexMapping,
2140    data_types: &[DataType],
2141    chunk: StreamChunk,
2142) -> StreamChunk {
2143    let cardinality = chunk.cardinality();
2144    let (ops, columns, vis) = chunk.into_inner();
2145    let mut full_columns = Vec::with_capacity(data_types.len());
2146    for (i, data_type) in data_types.iter().enumerate() {
2147        if let Some(j) = i2o_mapping.try_map(i) {
2148            full_columns.push(columns[j].clone());
2149        } else {
2150            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2151            column_builder.append_n_null(cardinality);
2152            let column: ArrayRef = column_builder.finish().into();
2153            full_columns.push(column)
2154        }
2155    }
2156    let data_chunk = DataChunk::new(full_columns, vis);
2157    StreamChunk::from_parts(ops, data_chunk)
2158}
2159
2160#[cfg(test)]
2161mod tests {
2162    use std::fmt::Debug;
2163
2164    use expect_test::{Expect, expect};
2165
2166    use super::*;
2167
2168    fn check(actual: impl Debug, expect: Expect) {
2169        let actual = format!("{:#?}", actual);
2170        expect.assert_eq(&actual);
2171    }
2172
2173    #[test]
2174    fn test_fill_non_output_indices() {
2175        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2176        let replicated_chunk = [OwnedRow::new(vec![
2177            Some(222_i32.into()),
2178            Some(2_i32.into()),
2179        ])];
2180        let replicated_chunk = StreamChunk::from_parts(
2181            vec![Op::Insert],
2182            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2183        );
2184        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2185        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2186        check(
2187            filled_chunk,
2188            expect![[r#"
2189            StreamChunk { cardinality: 1, capacity: 1, data:
2190            +---+---+---+-----+
2191            | + | 2 |   | 222 |
2192            +---+---+---+-----+
2193             }"#]],
2194        );
2195    }
2196}