risingwave_stream/common/table/
state_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use itertools::Itertools;
28use risingwave_common::array::stream_record::Record;
29use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{
32    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
33};
34use risingwave_common::config::StreamingConfig;
35use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
36use risingwave_common::row::{self, OwnedRow, Row, RowExt};
37use risingwave_common::types::{DataType, ScalarImpl};
38use risingwave_common::util::column_index_mapping::ColIndexMapping;
39use risingwave_common::util::epoch::EpochPair;
40use risingwave_common::util::row_serde::OrderedRowSerde;
41use risingwave_common::util::sort_util::OrderType;
42use risingwave_common::util::value_encoding::BasicSerde;
43use risingwave_hummock_sdk::HummockReadEpoch;
44use risingwave_hummock_sdk::key::{
45    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
46    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
47};
48use risingwave_hummock_sdk::table_watermark::{
49    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
50};
51use risingwave_pb::catalog::Table;
52use risingwave_storage::StateStore;
53use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
54use risingwave_storage::hummock::CachePolicy;
55use risingwave_storage::mem_table::MemTableError;
56use risingwave_storage::row_serde::find_columns_by_ids;
57use risingwave_storage::row_serde::row_serde_util::{
58    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
59};
60use risingwave_storage::row_serde::value_serde::ValueRowSerde;
61use risingwave_storage::store::*;
62use risingwave_storage::table::{KeyedRow, TableDistribution};
63use thiserror_ext::AsReport;
64use tracing::{Instrument, trace};
65
66use crate::cache::cache_may_stale;
67use crate::executor::StreamExecutorResult;
68use crate::executor::monitor::streaming_stats::StateTableMetrics;
69
70/// This macro is used to mark a point where we want to randomly discard the operation and early
71/// return, only in insane mode.
72macro_rules! insane_mode_discard_point {
73    () => {{
74        use rand::Rng;
75        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
76            return;
77        }
78    }};
79}
80
81/// Per-vnode statistics for pruning. None means this stat is not maintained.
82/// For each vnode, we maintain the min and max storage table key (excluding the vnode part) observed in the vnode.
83/// The stat won't differentiate between tombstone and normal keys.
84struct VnodeStatistics {
85    min_key: Option<Bytes>,
86    max_key: Option<Bytes>,
87}
88
89impl VnodeStatistics {
90    fn new() -> Self {
91        Self {
92            min_key: None,
93            max_key: None,
94        }
95    }
96
97    fn update_with_key(&mut self, key: &Bytes) {
98        if let Some(min) = &self.min_key {
99            if key < min {
100                self.min_key = Some(key.clone());
101            }
102        } else {
103            self.min_key = Some(key.clone());
104        }
105
106        if let Some(max) = &self.max_key {
107            if key > max {
108                self.max_key = Some(key.clone());
109            }
110        } else {
111            self.max_key = Some(key.clone());
112        }
113    }
114
115    fn can_prune(&self, key: &Bytes) -> bool {
116        if let Some(min) = &self.min_key
117            && key < min
118        {
119            return true;
120        }
121        if let Some(max) = &self.max_key
122            && key > max
123        {
124            return true;
125        }
126        false
127    }
128
129    fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
130        // Check if the range is completely outside vnode bounds
131        if let Some(max) = &self.max_key {
132            match start {
133                Included(s) if s > max => return true,
134                Excluded(s) if s >= max => return true,
135                _ => {}
136            }
137        }
138        if let Some(min) = &self.min_key {
139            match end {
140                Included(e) if e < min => return true,
141                Excluded(e) if e <= min => return true,
142                _ => {}
143            }
144        }
145        false
146    }
147
148    fn pruned_key_range(
149        &self,
150        start: &Bound<Bytes>,
151        end: &Bound<Bytes>,
152    ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
153        if self.can_prune_range(start, end) {
154            return None;
155        }
156        let new_start = if let Some(min) = &self.min_key {
157            match start {
158                Included(s) if s <= min => Included(min.clone()),
159                Excluded(s) if s < min => Included(min.clone()),
160                _ => start.clone(),
161            }
162        } else {
163            start.clone()
164        };
165
166        let new_end = if let Some(max) = &self.max_key {
167            match end {
168                Included(e) if e >= max => Included(max.clone()),
169                Excluded(e) if e > max => Included(max.clone()),
170                _ => end.clone(),
171            }
172        } else {
173            end.clone()
174        };
175
176        Some((new_start, new_end))
177    }
178}
179
180/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
181/// row-based encoding.
182pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
183where
184    S: StateStore,
185    SD: ValueRowSerde,
186{
187    /// Id for this table.
188    table_id: TableId,
189
190    /// State store backend.
191    row_store: StateTableRowStore<S::Local, SD>,
192
193    /// State store for accessing snapshot data
194    store: S,
195
196    /// Current epoch
197    epoch: Option<EpochPair>,
198
199    /// Used for serializing and deserializing the primary key.
200    pk_serde: OrderedRowSerde,
201
202    /// Indices of primary key.
203    /// Note that the index is based on the all columns of the table, instead of the output ones.
204    // FIXME: revisit constructions and usages.
205    pk_indices: Vec<usize>,
206
207    /// Distribution of the state table.
208    ///
209    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
210    /// executor. The table will also check whether the written rows
211    /// conform to this partition.
212    distribution: TableDistribution,
213
214    prefix_hint_len: usize,
215
216    value_indices: Option<Vec<usize>>,
217
218    /// The index of the watermark column used for state cleaning in all columns.
219    pub clean_watermark_index: Option<usize>,
220    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
221    pending_watermark: Option<ScalarImpl>,
222    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
223    committed_watermark: Option<ScalarImpl>,
224    /// Serializer and serde type for the watermark column.
225    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
226
227    /// Data Types
228    /// We will need to use to build data chunks from state table rows.
229    data_types: Vec<DataType>,
230
231    /// "i" here refers to the base `state_table`'s actual schema.
232    /// "o" here refers to the replicated state table's output schema.
233    /// This mapping is used to reconstruct a row being written from replicated state table.
234    /// Such that the schema of this row will match the full schema of the base state table.
235    /// It is only applicable for replication.
236    i2o_mapping: ColIndexMapping,
237
238    /// Output indices
239    /// Used for:
240    /// 1. Computing `output_value_indices` to ser/de replicated rows.
241    /// 2. Computing output pk indices to used them for backfill state.
242    pub output_indices: Vec<usize>,
243
244    op_consistency_level: StateTableOpConsistencyLevel,
245
246    /// Flag to indicate whether the state table has called `commit`, but has not called
247    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
248    on_post_commit: bool,
249}
250
251/// `StateTable` will use `BasicSerde` as default
252pub type StateTable<S> = StateTableInner<S, BasicSerde>;
253/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
254/// Used for `ArrangementBackfill` executor.
255pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
256
257// initialize
258impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
259where
260    S: StateStore,
261    SD: ValueRowSerde,
262{
263    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
264    /// and otherwise, deadlock can be likely to happen.
265    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
266        self.row_store
267            .init(epoch, self.distribution.vnodes())
268            .await?;
269        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
270        Ok(())
271    }
272
273    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
274        self.store
275            .try_wait_epoch(
276                HummockReadEpoch::Committed(prev_epoch),
277                TryWaitEpochOptions {
278                    table_id: self.table_id,
279                },
280            )
281            .await
282    }
283
284    pub fn state_store(&self) -> &S {
285        &self.store
286    }
287}
288
289fn consistent_old_value_op(
290    row_serde: Arc<impl ValueRowSerde>,
291    is_log_store: bool,
292) -> OpConsistencyLevel {
293    OpConsistencyLevel::ConsistentOldValue {
294        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
295            if first == second {
296                return true;
297            }
298            let first = match row_serde.deserialize(first) {
299                Ok(rows) => rows,
300                Err(e) => {
301                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
302                    return false;
303                }
304            };
305            let second = match row_serde.deserialize(second) {
306                Ok(rows) => rows,
307                Err(e) => {
308                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
309                    return false;
310                }
311            };
312            if first != second {
313                error!(first = ?first, second = ?second, "sanity check fail");
314                false
315            } else {
316                true
317            }
318        }),
319        is_log_store,
320    }
321}
322
323macro_rules! dispatch_value_indices {
324    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
325        if let Some(value_indices) = $value_indices {
326            $(
327                let $row_var_name = $row_var_name.project(value_indices);
328            )+
329            $body
330        } else {
331            $body
332        }
333    };
334}
335
336/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
337/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
338/// provides method to read (`get` and `iter`) over serialized key (or key range)
339/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
340struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
341    state_store: LS,
342    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
343
344    table_id: TableId,
345    table_option: TableOption,
346    row_serde: Arc<SD>,
347    // should be only used for debugging in panic message of handle_mem_table_error
348    pk_serde: OrderedRowSerde,
349
350    // Per-vnode min/max key statistics for pruning
351    vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
352    // Optional metrics for state table operations
353    pub metrics: Option<StateTableMetrics>,
354}
355
356impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
357    async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
358        if self.vnode_stats.is_none() {
359            return Ok(());
360        }
361
362        // vnode stats must be disabled when all rows are preloaded
363        assert!(self.all_rows.is_none());
364
365        let start_time = Instant::now();
366        let mut stats_map = HashMap::new();
367
368        // Scan each vnode to get min/max keys
369        for vnode in vnode_bitmap.iter_vnodes() {
370            let mut stats = VnodeStatistics::new();
371
372            // Get min key via forward iteration
373            let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
374            let read_options = ReadOptions {
375                retention_seconds: self.table_option.retention_seconds,
376                cache_policy: CachePolicy::Fill(Hint::Low),
377                ..Default::default()
378            };
379
380            let mut iter = self
381                .state_store
382                .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
383                .await?;
384            if let Some(item) = iter.try_next().await? {
385                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
386                assert_eq!(vnode, key_vnode);
387                stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
388            }
389
390            // Get max key via reverse iteration
391            let mut rev_iter = self
392                .state_store
393                .rev_iter(memcomparable_range_with_vnode, read_options)
394                .await?;
395            if let Some(item) = rev_iter.try_next().await? {
396                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
397                assert_eq!(vnode, key_vnode);
398                stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
399            }
400
401            stats_map.insert(vnode, stats);
402        }
403
404        self.vnode_stats = Some(stats_map);
405
406        // avoid flooding e2e-test log
407        if !cfg!(debug_assertions) {
408            info!(
409                table_id = %self.table_id,
410                vnode_count = vnode_bitmap.count_ones(),
411                duration = ?start_time.elapsed(),
412                "finished initializing vnode statistics"
413            );
414        }
415
416        Ok(())
417    }
418
419    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
420        if let Some(rows) = &mut self.all_rows {
421            rows.clear();
422            let start_time = Instant::now();
423            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
424                let state_store = &self.state_store;
425                let retention_seconds = self.table_option.retention_seconds;
426                let row_serde = &self.row_serde;
427                async move {
428                    let mut rows = BTreeMap::new();
429                    let memcomparable_range_with_vnode =
430                        prefixed_range_with_vnode::<Bytes>(.., vnode);
431                    // TODO: set read options
432                    let stream = deserialize_keyed_row_stream::<Bytes>(
433                        state_store
434                            .iter(
435                                memcomparable_range_with_vnode,
436                                ReadOptions {
437                                    prefix_hint: None,
438                                    prefetch_options: Default::default(),
439                                    cache_policy: Default::default(),
440                                    retention_seconds,
441                                },
442                            )
443                            .await?,
444                        &**row_serde,
445                    );
446                    pin_mut!(stream);
447                    while let Some((encoded_key, row)) = stream.try_next().await? {
448                        let key = TableKey(encoded_key);
449                        let (iter_vnode, key) = key.split_vnode_bytes();
450                        assert_eq!(vnode, iter_vnode);
451                        rows.try_insert(key, row).expect("non-duplicated");
452                    }
453                    Ok((vnode, rows)) as StreamExecutorResult<_>
454                }
455            }))
456            .await?
457            .into_iter()
458            .collect();
459            // avoid flooding e2e-test log
460            if !cfg!(debug_assertions) {
461                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
462            }
463        }
464        Ok(())
465    }
466
467    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
468        self.state_store.init(InitOptions::new(epoch)).await?;
469        self.may_reload_all_rows(vnode_bitmap).await?;
470        self.may_load_vnode_stats(vnode_bitmap).await
471    }
472
473    async fn update_vnode_bitmap(
474        &mut self,
475        vnodes: Arc<Bitmap>,
476    ) -> StreamExecutorResult<Arc<Bitmap>> {
477        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
478        self.may_reload_all_rows(&vnodes).await?;
479        self.may_load_vnode_stats(&vnodes).await?;
480
481        Ok(prev_vnodes)
482    }
483
484    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
485        self.state_store.try_flush().await?;
486        Ok(())
487    }
488
489    async fn seal_current_epoch(
490        &mut self,
491        next_epoch: u64,
492        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
493        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
494    ) -> StreamExecutorResult<()> {
495        if let Some((direction, watermarks, serde_type)) = &table_watermarks
496            && let Some(rows) = &mut self.all_rows
497        {
498            match serde_type {
499                WatermarkSerdeType::PkPrefix => {
500                    for vnode_watermark in watermarks {
501                        match direction {
502                            WatermarkDirection::Ascending => {
503                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
504                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
505                                    // split_off returns everything after the given key, including the key.
506                                    *rows = rows.split_off(vnode_watermark.watermark());
507                                }
508                            }
509                            WatermarkDirection::Descending => {
510                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
511                                let split_off_key = next_key(vnode_watermark.watermark());
512                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
513                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
514                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
515                                    // (..Include(vnode_watermark.watermark()))
516                                    rows.split_off(split_off_key.as_slice());
517                                }
518                            }
519                        }
520                    }
521                }
522                WatermarkSerdeType::NonPkPrefix => {
523                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
524                    self.all_rows = None;
525                }
526            }
527        }
528        self.state_store
529            .flush()
530            .instrument(tracing::info_span!("state_table_flush"))
531            .await?;
532        let switch_op_consistency_level =
533            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
534                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
535                StateTableOpConsistencyLevel::ConsistentOldValue => {
536                    consistent_old_value_op(self.row_serde.clone(), false)
537                }
538                StateTableOpConsistencyLevel::LogStoreEnabled => {
539                    consistent_old_value_op(self.row_serde.clone(), true)
540                }
541            });
542        self.state_store.seal_current_epoch(
543            next_epoch,
544            SealCurrentEpochOptions {
545                table_watermarks,
546                switch_op_consistency_level,
547            },
548        );
549        Ok(())
550    }
551}
552
553#[derive(Eq, PartialEq, Copy, Clone, Debug)]
554pub enum StateTableOpConsistencyLevel {
555    /// Op is inconsistent
556    Inconsistent,
557    /// Op is consistent.
558    /// - Insert op should ensure that the key does not exist previously
559    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
560    ConsistentOldValue,
561    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
562    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
563    LogStoreEnabled,
564}
565
566pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
567    table_catalog: &'a Table,
568    store: S,
569    vnodes: Option<Arc<Bitmap>>,
570    op_consistency_level: Option<StateTableOpConsistencyLevel>,
571    output_column_ids: Option<Vec<ColumnId>>,
572    preload_all_rows: PreloadAllRow,
573    enable_vnode_key_pruning: Option<bool>,
574    metrics: Option<StateTableMetrics>,
575
576    _serde: PhantomData<SD>,
577}
578
579impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
580    StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
581{
582    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
583        Self {
584            table_catalog,
585            store,
586            vnodes,
587            op_consistency_level: None,
588            output_column_ids: None,
589            preload_all_rows: (),
590            enable_vnode_key_pruning: None,
591            metrics: None,
592            _serde: Default::default(),
593        }
594    }
595
596    fn with_preload_all_rows(
597        self,
598        preload_all_rows: bool,
599    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
600        StateTableBuilder {
601            table_catalog: self.table_catalog,
602            store: self.store,
603            vnodes: self.vnodes,
604            op_consistency_level: self.op_consistency_level,
605            output_column_ids: self.output_column_ids,
606            preload_all_rows,
607            enable_vnode_key_pruning: self.enable_vnode_key_pruning,
608            metrics: self.metrics,
609            _serde: Default::default(),
610        }
611    }
612
613    pub fn enable_preload_all_rows_by_config(
614        self,
615        config: &StreamingConfig,
616    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
617        let developer = &config.developer;
618        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
619            !developer
620                .mem_preload_state_table_ids_blacklist
621                .contains(&self.table_catalog.id.as_raw_id())
622        } else {
623            developer
624                .mem_preload_state_table_ids_whitelist
625                .contains(&self.table_catalog.id.as_raw_id())
626        };
627        self.with_preload_all_rows(preload_all_rows)
628    }
629
630    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
631        self.with_preload_all_rows(false)
632    }
633}
634
635impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
636    StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
637{
638    pub fn with_op_consistency_level(
639        mut self,
640        op_consistency_level: StateTableOpConsistencyLevel,
641    ) -> Self {
642        self.op_consistency_level = Some(op_consistency_level);
643        self
644    }
645
646    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
647        self.output_column_ids = Some(output_column_ids);
648        self
649    }
650
651    pub fn enable_vnode_key_pruning(mut self, enable: bool) -> Self {
652        self.enable_vnode_key_pruning = Some(enable);
653        self
654    }
655
656    pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
657        self.metrics = Some(metrics);
658        self
659    }
660}
661
662impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
663    StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
664{
665    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
666        let mut preload_all_rows = self.preload_all_rows;
667        if preload_all_rows
668            && let Err(e) =
669                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
670        {
671            warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
672            preload_all_rows = false;
673        }
674
675        let should_enable_vnode_key_pruning = if preload_all_rows
676            && let Some(enable_vnode_key_pruning) = self.enable_vnode_key_pruning
677            && enable_vnode_key_pruning
678        {
679            false
680        } else {
681            self.enable_vnode_key_pruning.unwrap_or(false)
682        };
683
684        StateTableInner::from_table_catalog_inner(
685            self.table_catalog,
686            self.store,
687            self.vnodes,
688            self.op_consistency_level
689                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
690            self.output_column_ids.unwrap_or_default(),
691            preload_all_rows,
692            should_enable_vnode_key_pruning,
693            self.metrics,
694        )
695        .await
696    }
697}
698
699// initialize
700// FIXME(kwannoel): Enforce that none of the constructors here
701// should be used by replicated state table.
702// Apart from from_table_catalog_inner.
703impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
704where
705    S: StateStore,
706    SD: ValueRowSerde,
707{
708    /// Create state table from table catalog and store.
709    ///
710    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
711    #[cfg(any(test, feature = "test"))]
712    pub async fn from_table_catalog(
713        table_catalog: &Table,
714        store: S,
715        vnodes: Option<Arc<Bitmap>>,
716    ) -> Self {
717        StateTableBuilder::new(table_catalog, store, vnodes)
718            .forbid_preload_all_rows()
719            .build()
720            .await
721    }
722
723    /// Create state table from table catalog and store with sanity check disabled.
724    pub async fn from_table_catalog_inconsistent_op(
725        table_catalog: &Table,
726        store: S,
727        vnodes: Option<Arc<Bitmap>>,
728    ) -> Self {
729        StateTableBuilder::new(table_catalog, store, vnodes)
730            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
731            .forbid_preload_all_rows()
732            .build()
733            .await
734    }
735
736    /// Create state table from table catalog and store.
737    #[allow(clippy::too_many_arguments)]
738    async fn from_table_catalog_inner(
739        table_catalog: &Table,
740        store: S,
741        vnodes: Option<Arc<Bitmap>>,
742        op_consistency_level: StateTableOpConsistencyLevel,
743        output_column_ids: Vec<ColumnId>,
744        preload_all_rows: bool,
745        enable_vnode_key_pruning: bool,
746        metrics: Option<StateTableMetrics>,
747    ) -> Self {
748        let table_id = table_catalog.id;
749        let table_columns: Vec<ColumnDesc> = table_catalog
750            .columns
751            .iter()
752            .map(|col| col.column_desc.as_ref().unwrap().into())
753            .collect();
754        let data_types: Vec<DataType> = table_catalog
755            .columns
756            .iter()
757            .map(|col| {
758                col.get_column_desc()
759                    .unwrap()
760                    .get_column_type()
761                    .unwrap()
762                    .into()
763            })
764            .collect();
765        let order_types: Vec<OrderType> = table_catalog
766            .pk
767            .iter()
768            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
769            .collect();
770        let dist_key_indices: Vec<usize> = table_catalog
771            .distribution_key
772            .iter()
773            .map(|dist_index| *dist_index as usize)
774            .collect();
775
776        let pk_indices = table_catalog
777            .pk
778            .iter()
779            .map(|col_order| col_order.column_index as usize)
780            .collect_vec();
781
782        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
783        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
784            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
785        } else {
786            table_catalog
787                .get_dist_key_in_pk()
788                .iter()
789                .map(|idx| *idx as usize)
790                .collect()
791        };
792
793        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
794            let vnode_col_idx = *idx as usize;
795            pk_indices.iter().position(|&i| vnode_col_idx == i)
796        });
797
798        let distribution =
799            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
800        assert_eq!(
801            distribution.vnode_count(),
802            table_catalog.vnode_count(),
803            "vnode count mismatch, scanning table {} under wrong distribution?",
804            table_catalog.name,
805        );
806
807        let pk_data_types = pk_indices
808            .iter()
809            .map(|i| table_columns[*i].data_type.clone())
810            .collect();
811        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
812
813        let input_value_indices = table_catalog
814            .value_indices
815            .iter()
816            .map(|val| *val as usize)
817            .collect_vec();
818
819        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
820
821        // if value_indices is the no shuffle full columns.
822        let value_indices = match input_value_indices.len() == table_columns.len()
823            && input_value_indices == no_shuffle_value_indices
824        {
825            true => None,
826            false => Some(input_value_indices),
827        };
828        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
829
830        let row_serde = Arc::new(SD::new(
831            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
832            Arc::from(table_columns.clone().into_boxed_slice()),
833        ));
834
835        let state_table_op_consistency_level = op_consistency_level;
836        let op_consistency_level = match op_consistency_level {
837            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
838            StateTableOpConsistencyLevel::ConsistentOldValue => {
839                consistent_old_value_op(row_serde.clone(), false)
840            }
841            StateTableOpConsistencyLevel::LogStoreEnabled => {
842                consistent_old_value_op(row_serde.clone(), true)
843            }
844        };
845
846        let table_option = TableOption::new(table_catalog.retention_seconds);
847        let new_local_options = if IS_REPLICATED {
848            NewLocalOptions::new_replicated(
849                table_id,
850                op_consistency_level,
851                table_option,
852                distribution.vnodes().clone(),
853            )
854        } else {
855            NewLocalOptions::new(
856                table_id,
857                op_consistency_level,
858                table_option,
859                distribution.vnodes().clone(),
860                true,
861            )
862        };
863        let local_state_store = store.new_local(new_local_options).await;
864
865        // If state table has versioning, that means it supports
866        // Schema change. In that case, the row encoding should be column aware as well.
867        // Otherwise both will be false.
868        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
869        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
870        assert_eq!(
871            table_catalog.version.is_some(),
872            row_serde.kind().is_column_aware()
873        );
874
875        // Get info for replicated state table.
876        let output_column_ids_to_input_idx = output_column_ids
877            .iter()
878            .enumerate()
879            .map(|(pos, id)| (*id, pos))
880            .collect::<HashMap<_, _>>();
881
882        // Compute column descriptions
883        let columns: Vec<ColumnDesc> = table_catalog
884            .columns
885            .iter()
886            .map(|c| c.column_desc.as_ref().unwrap().into())
887            .collect_vec();
888
889        // Compute i2o mapping
890        // Note that this can be a partial mapping, since we use the i2o mapping to get
891        // any 1 of the output columns, and use that to fill the input column.
892        let mut i2o_mapping = vec![None; columns.len()];
893        for (i, column) in columns.iter().enumerate() {
894            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
895                i2o_mapping[i] = Some(*pos);
896            }
897        }
898        // We can prune any duplicate column indices
899        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
900
901        // Compute output indices
902        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
903
904        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
905        if clean_watermark_indices.len() > 1 {
906            unimplemented!("multiple clean watermark columns are not supported yet")
907        }
908        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
909
910        let watermark_serde = clean_watermark_index.map(|idx| {
911            let pk_idx = pk_indices.iter().position(|&i| i == idx);
912            let (watermark_serde, watermark_serde_type) = match pk_idx {
913                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
914                Some(pk_idx) => (
915                    pk_serde.index(pk_idx).into_owned(),
916                    WatermarkSerdeType::NonPkPrefix,
917                ),
918                None => (
919                    OrderedRowSerde::new(
920                        vec![data_types[idx].clone()],
921                        vec![OrderType::ascending()],
922                    ),
923                    // TODO(ttl): may introduce a new type for watermark not in pk.
924                    WatermarkSerdeType::NonPkPrefix,
925                ),
926            };
927            (watermark_serde, watermark_serde_type)
928        });
929
930        // Restore persisted table watermark.
931        //
932        // Note: currently the underlying local state store only exposes persisted watermarks for
933        // `PkPrefix` type (i.e., the first PK column), so we only restore in that case.
934        let max_watermark_of_vnodes = distribution
935            .vnodes()
936            .iter_vnodes()
937            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
938            .max();
939        let committed_watermark = if let Some((deser, WatermarkSerdeType::PkPrefix)) =
940            watermark_serde.as_ref()
941            && let Some(max_watermark) = max_watermark_of_vnodes
942        {
943            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
944                assert!(row.len() == 1);
945                row[0].clone()
946            });
947            if deserialized.is_none() {
948                tracing::error!(
949                    vnodes = ?distribution.vnodes(),
950                    watermark = ?max_watermark,
951                    "Failed to deserialize persisted watermark from state store.",
952                );
953            }
954            deserialized
955        } else {
956            None
957        };
958
959        Self {
960            table_id,
961            row_store: StateTableRowStore {
962                all_rows: preload_all_rows.then(HashMap::new),
963                table_option,
964                state_store: local_state_store,
965                row_serde,
966                pk_serde: pk_serde.clone(),
967                table_id,
968                // Need to maintain vnode min/max key stats when vnode key pruning is enabled
969                vnode_stats: enable_vnode_key_pruning.then(HashMap::new),
970                metrics,
971            },
972            store,
973            epoch: None,
974            pk_serde,
975            pk_indices,
976            distribution,
977            prefix_hint_len,
978            value_indices,
979            pending_watermark: None,
980            committed_watermark,
981            watermark_serde,
982            data_types,
983            output_indices,
984            i2o_mapping,
985            op_consistency_level: state_table_op_consistency_level,
986            clean_watermark_index,
987            on_post_commit: false,
988        }
989    }
990
991    pub fn get_data_types(&self) -> &[DataType] {
992        &self.data_types
993    }
994
995    pub fn table_id(&self) -> TableId {
996        self.table_id
997    }
998
999    /// Get the vnode value with given (prefix of) primary key
1000    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1001        self.distribution
1002            .try_compute_vnode_by_pk_prefix(pk_prefix)
1003            .expect("For streaming, the given prefix must be enough to calculate the vnode")
1004    }
1005
1006    /// Get the vnode value of the given primary key
1007    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1008        self.distribution.compute_vnode_by_pk(pk)
1009    }
1010
1011    /// NOTE(kwannoel): This is used by backfill.
1012    /// We want to check pk indices of upstream table.
1013    pub fn pk_indices(&self) -> &[usize] {
1014        &self.pk_indices
1015    }
1016
1017    /// Get the indices of the primary key columns in the output columns.
1018    ///
1019    /// Returns `None` if any of the primary key columns is not in the output columns.
1020    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1021        assert!(IS_REPLICATED);
1022        self.pk_indices
1023            .iter()
1024            .map(|&i| self.output_indices.iter().position(|&j| i == j))
1025            .collect()
1026    }
1027
1028    pub fn pk_serde(&self) -> &OrderedRowSerde {
1029        &self.pk_serde
1030    }
1031
1032    pub fn vnodes(&self) -> &Arc<Bitmap> {
1033        self.distribution.vnodes()
1034    }
1035
1036    pub fn value_indices(&self) -> &Option<Vec<usize>> {
1037        &self.value_indices
1038    }
1039
1040    pub fn is_consistent_op(&self) -> bool {
1041        matches!(
1042            self.op_consistency_level,
1043            StateTableOpConsistencyLevel::ConsistentOldValue
1044                | StateTableOpConsistencyLevel::LogStoreEnabled
1045        )
1046    }
1047
1048    pub fn metrics(&self) -> Option<&StateTableMetrics> {
1049        self.row_store.metrics.as_ref()
1050    }
1051}
1052
1053impl<S, SD> StateTableInner<S, SD, true>
1054where
1055    S: StateStore,
1056    SD: ValueRowSerde,
1057{
1058    /// Create replicated state table from table catalog with output indices
1059    pub async fn new_replicated(
1060        table_catalog: &Table,
1061        store: S,
1062        vnodes: Option<Arc<Bitmap>>,
1063        output_column_ids: Vec<ColumnId>,
1064    ) -> Self {
1065        // TODO: can it be ConsistentOldValue?
1066        // TODO: may enable preload_all_rows
1067        StateTableBuilder::new(table_catalog, store, vnodes)
1068            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1069            .with_output_column_ids(output_column_ids)
1070            .forbid_preload_all_rows()
1071            .build()
1072            .await
1073    }
1074}
1075
1076// point get
1077impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1078where
1079    S: StateStore,
1080    SD: ValueRowSerde,
1081{
1082    /// Get a single row from state table.
1083    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1084        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1085        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1086        match row {
1087            Some(row) => {
1088                if IS_REPLICATED {
1089                    // If the table is replicated, we need to deserialize the row with the output
1090                    // indices.
1091                    let row = row.project(&self.output_indices);
1092                    Ok(Some(row.into_owned_row()))
1093                } else {
1094                    Ok(Some(row))
1095                }
1096            }
1097            None => Ok(None),
1098        }
1099    }
1100
1101    /// Get a raw encoded row from state table.
1102    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1103        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1104        self.row_store.exists(serialized_pk, prefix_hint).await
1105    }
1106
1107    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1108        assert!(pk.len() <= self.pk_indices.len());
1109        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1110    }
1111
1112    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1113        let serialized_pk = self.serialize_pk(&pk);
1114        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1115            Some(serialized_pk.slice(VirtualNode::SIZE..))
1116        } else {
1117            #[cfg(debug_assertions)]
1118            if self.prefix_hint_len != 0 {
1119                warn!(
1120                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1121                );
1122            }
1123            None
1124        };
1125        (serialized_pk, prefix_hint)
1126    }
1127}
1128
1129impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1130    async fn get(
1131        &self,
1132        key_bytes: TableKey<Bytes>,
1133        prefix_hint: Option<Bytes>,
1134    ) -> StreamExecutorResult<Option<OwnedRow>> {
1135        if let Some(m) = &self.metrics {
1136            m.get_count.inc();
1137        }
1138        if let Some(rows) = &self.all_rows {
1139            let (vnode, key) = key_bytes.split_vnode_bytes();
1140            return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1141        }
1142
1143        // Try to prune using vnode statistics
1144        if let Some(stats) = &self.vnode_stats
1145            && let (vnode, key) = key_bytes.split_vnode_bytes()
1146            && let Some(vnode_stat) = stats.get(&vnode)
1147            && vnode_stat.can_prune(&key)
1148        {
1149            if let Some(m) = &self.metrics {
1150                m.get_vnode_pruned_count.inc();
1151            }
1152            return Ok(None);
1153        }
1154
1155        let read_options = ReadOptions {
1156            prefix_hint,
1157            retention_seconds: self.table_option.retention_seconds,
1158            cache_policy: CachePolicy::Fill(Hint::Normal),
1159            ..Default::default()
1160        };
1161
1162        self.state_store
1163            .on_key_value(key_bytes, read_options, move |_, value| {
1164                let row = self.row_serde.deserialize(value)?;
1165                Ok(OwnedRow::new(row))
1166            })
1167            .await
1168            .map_err(Into::into)
1169    }
1170
1171    async fn exists(
1172        &self,
1173        key_bytes: TableKey<Bytes>,
1174        prefix_hint: Option<Bytes>,
1175    ) -> StreamExecutorResult<bool> {
1176        if let Some(m) = &self.metrics {
1177            m.get_count.inc();
1178        }
1179        if let Some(rows) = &self.all_rows {
1180            let (vnode, key) = key_bytes.split_vnode_bytes();
1181            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1182        }
1183
1184        // Try to prune using vnode statistics
1185        if let Some(stats) = &self.vnode_stats
1186            && let (vnode, key) = key_bytes.split_vnode_bytes()
1187            && let Some(vnode_stat) = stats.get(&vnode)
1188            && vnode_stat.can_prune(&key)
1189        {
1190            if let Some(m) = &self.metrics {
1191                m.get_vnode_pruned_count.inc();
1192            }
1193            return Ok(false);
1194        }
1195
1196        let read_options = ReadOptions {
1197            prefix_hint,
1198            retention_seconds: self.table_option.retention_seconds,
1199            cache_policy: CachePolicy::Fill(Hint::Normal),
1200            ..Default::default()
1201        };
1202        let result = self
1203            .state_store
1204            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1205            .await?;
1206        Ok(result.is_some())
1207    }
1208}
1209
1210/// A callback struct returned from [`StateTableInner::commit`].
1211///
1212/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1213/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1214/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1215///
1216/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1217/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1218/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1219/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1220/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1221/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1222/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1223/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1224#[must_use]
1225pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1226where
1227    S: StateStore,
1228    SD: ValueRowSerde,
1229{
1230    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1231}
1232
1233impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1234where
1235    S: StateStore,
1236    SD: ValueRowSerde,
1237{
1238    pub async fn post_yield_barrier(
1239        mut self,
1240        new_vnodes: Option<Arc<Bitmap>>,
1241    ) -> StreamExecutorResult<
1242        Option<(
1243            (
1244                Arc<Bitmap>,
1245                Arc<Bitmap>,
1246                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1247            ),
1248            bool,
1249        )>,
1250    > {
1251        self.inner.on_post_commit = false;
1252        Ok(if let Some(new_vnodes) = new_vnodes {
1253            let (old_vnodes, cache_may_stale) =
1254                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1255            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1256        } else {
1257            None
1258        })
1259    }
1260
1261    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1262        &*self.inner
1263    }
1264
1265    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1266    async fn update_vnode_bitmap(
1267        &mut self,
1268        new_vnodes: Arc<Bitmap>,
1269    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1270        let prev_vnodes = self
1271            .inner
1272            .row_store
1273            .update_vnode_bitmap(new_vnodes.clone())
1274            .await?;
1275        assert_eq!(
1276            &prev_vnodes,
1277            self.inner.vnodes(),
1278            "state table and state store vnode bitmap mismatches"
1279        );
1280
1281        if self.inner.distribution.is_singleton() {
1282            assert_eq!(
1283                &new_vnodes,
1284                self.inner.vnodes(),
1285                "should not update vnode bitmap for singleton table"
1286            );
1287        }
1288        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1289
1290        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1291
1292        if cache_may_stale {
1293            self.inner.pending_watermark = None;
1294        }
1295
1296        Ok((
1297            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1298            cache_may_stale,
1299        ))
1300    }
1301}
1302
1303// write
1304impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1305    fn handle_mem_table_error(&self, e: StorageError) {
1306        let e = match e.into_inner() {
1307            ErrorKind::MemTable(e) => e,
1308            _ => unreachable!("should only get memtable error"),
1309        };
1310        match *e {
1311            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1312                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1313                panic!(
1314                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1315                    self.table_id,
1316                    vnode,
1317                    &key,
1318                    prev.debug_fmt(&*self.row_serde),
1319                    new.debug_fmt(&*self.row_serde),
1320                )
1321            }
1322        }
1323    }
1324
1325    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1326        insane_mode_discard_point!();
1327        let value_bytes = self.row_serde.serialize(&value).into();
1328
1329        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1330
1331        // Update vnode statistics (skip if all_rows is present)
1332        if self.all_rows.is_none()
1333            && let Some(stats) = &mut self.vnode_stats
1334            && let Some(vnode_stat) = stats.get_mut(&vnode)
1335        {
1336            vnode_stat.update_with_key(&key_without_vnode);
1337        }
1338
1339        if let Some(rows) = &mut self.all_rows {
1340            rows.get_mut(&vnode)
1341                .expect("covered vnode")
1342                .insert(key_without_vnode, value.into_owned_row());
1343        }
1344        self.state_store
1345            .insert(key, value_bytes, None)
1346            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1347    }
1348
1349    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1350        insane_mode_discard_point!();
1351        let value_bytes = self.row_serde.serialize(value).into();
1352
1353        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1354
1355        // Clear vnode statistics on delete (conservative approach, skip if all_rows is present)
1356        // The deleted key might be the min or max, so we invalidate the stats
1357        if self.all_rows.is_none()
1358            && let Some(stats) = &mut self.vnode_stats
1359            && let Some(vnode_stat) = stats.get_mut(&vnode)
1360        {
1361            vnode_stat.update_with_key(&key_without_vnode);
1362        }
1363
1364        if let Some(rows) = &mut self.all_rows {
1365            rows.get_mut(&vnode)
1366                .expect("covered vnode")
1367                .remove(&key_without_vnode);
1368        }
1369        self.state_store
1370            .delete(key, value_bytes)
1371            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1372    }
1373
1374    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1375        insane_mode_discard_point!();
1376        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1377        let old_value_bytes = self.row_serde.serialize(old_value).into();
1378
1379        let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1380
1381        // Update does not change the key, so statistics remain valid (skip if all_rows is present)
1382        // But we update to ensure consistency
1383        if self.all_rows.is_none()
1384            && let Some(stats) = &mut self.vnode_stats
1385            && let Some(vnode_stat) = stats.get_mut(&vnode)
1386        {
1387            vnode_stat.update_with_key(&key_without_vnode);
1388        }
1389
1390        if let Some(rows) = &mut self.all_rows {
1391            rows.get_mut(&vnode)
1392                .expect("covered vnode")
1393                .insert(key_without_vnode, new_value.into_owned_row());
1394        }
1395        self.state_store
1396            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1397            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1398    }
1399}
1400
1401impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1402where
1403    S: StateStore,
1404    SD: ValueRowSerde,
1405{
1406    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1407    /// the table.
1408    pub fn insert(&mut self, value: impl Row) {
1409        let pk_indices = &self.pk_indices;
1410        let pk = (&value).project(pk_indices);
1411
1412        let key_bytes = self.serialize_pk(&pk);
1413        dispatch_value_indices!(&self.value_indices, [value], {
1414            self.row_store.insert(key_bytes, value)
1415        })
1416    }
1417
1418    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1419    /// column desc of the table.
1420    pub fn delete(&mut self, old_value: impl Row) {
1421        let pk_indices = &self.pk_indices;
1422        let pk = (&old_value).project(pk_indices);
1423
1424        let key_bytes = self.serialize_pk(&pk);
1425        dispatch_value_indices!(&self.value_indices, [old_value], {
1426            self.row_store.delete(key_bytes, old_value)
1427        })
1428    }
1429
1430    /// Update a row. The old and new value should have the same pk.
1431    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1432        let old_pk = (&old_value).project(self.pk_indices());
1433        let new_pk = (&new_value).project(self.pk_indices());
1434        debug_assert!(
1435            Row::eq(&old_pk, new_pk),
1436            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1437            self.table_id
1438        );
1439
1440        let key_bytes = self.serialize_pk(&new_pk);
1441        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1442            self.row_store.update(key_bytes, old_value, new_value)
1443        })
1444    }
1445
1446    /// Write a record into state table. Must have the same schema with the table.
1447    pub fn write_record(&mut self, record: Record<impl Row>) {
1448        match record {
1449            Record::Insert { new_row } => self.insert(new_row),
1450            Record::Delete { old_row } => self.delete(old_row),
1451            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1452        }
1453    }
1454
1455    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1456        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1457    }
1458
1459    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1460    // allow(izip, which use zip instead of zip_eq)
1461    #[allow(clippy::disallowed_methods)]
1462    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1463        let chunk = if IS_REPLICATED {
1464            self.fill_non_output_indices(chunk)
1465        } else {
1466            chunk
1467        };
1468
1469        let vnodes = self
1470            .distribution
1471            .compute_chunk_vnode(&chunk, &self.pk_indices);
1472
1473        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1474            let Some((op, row)) = optional_row else {
1475                continue;
1476            };
1477            let pk = row.project(&self.pk_indices);
1478            let vnode = vnodes[idx];
1479            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1480            match op {
1481                Op::Insert | Op::UpdateInsert => {
1482                    dispatch_value_indices!(&self.value_indices, [row], {
1483                        self.row_store.insert(key_bytes, row);
1484                    });
1485                }
1486                Op::Delete | Op::UpdateDelete => {
1487                    dispatch_value_indices!(&self.value_indices, [row], {
1488                        self.row_store.delete(key_bytes, row);
1489                    });
1490                }
1491            }
1492        }
1493    }
1494
1495    /// Update watermark for state cleaning.
1496    ///
1497    /// # Arguments
1498    ///
1499    /// * `watermark` - Latest watermark received.
1500    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1501        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1502        self.pending_watermark = Some(watermark);
1503    }
1504
1505    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1506    /// table through `update_watermark` method.
1507    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1508        self.committed_watermark.as_ref()
1509    }
1510
1511    pub async fn commit(
1512        &mut self,
1513        new_epoch: EpochPair,
1514    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1515        self.commit_inner(new_epoch, None).await
1516    }
1517
1518    #[cfg(test)]
1519    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1520        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1521    }
1522
1523    pub async fn commit_assert_no_update_vnode_bitmap(
1524        &mut self,
1525        new_epoch: EpochPair,
1526    ) -> StreamExecutorResult<()> {
1527        let post_commit = self.commit_inner(new_epoch, None).await?;
1528        post_commit.post_yield_barrier(None).await?;
1529        Ok(())
1530    }
1531
1532    pub async fn commit_may_switch_consistent_op(
1533        &mut self,
1534        new_epoch: EpochPair,
1535        op_consistency_level: StateTableOpConsistencyLevel,
1536    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1537        if self.op_consistency_level != op_consistency_level {
1538            // avoid flooding e2e-test log
1539            if !cfg!(debug_assertions) {
1540                info!(
1541                    ?new_epoch,
1542                    prev_op_consistency_level = ?self.op_consistency_level,
1543                    ?op_consistency_level,
1544                    table_id = %self.table_id,
1545                    "switch to new op consistency level"
1546                );
1547            }
1548            self.commit_inner(new_epoch, Some(op_consistency_level))
1549                .await
1550        } else {
1551            self.commit_inner(new_epoch, None).await
1552        }
1553    }
1554
1555    async fn commit_inner(
1556        &mut self,
1557        new_epoch: EpochPair,
1558        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1559    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1560        assert!(!self.on_post_commit);
1561        assert_eq!(
1562            self.epoch.expect("should only be called after init").curr,
1563            new_epoch.prev
1564        );
1565        if let Some(new_consistency_level) = switch_consistent_op {
1566            assert_ne!(self.op_consistency_level, new_consistency_level);
1567            self.op_consistency_level = new_consistency_level;
1568        }
1569        trace!(
1570            table_id = %self.table_id,
1571            epoch = ?self.epoch,
1572            "commit state table"
1573        );
1574
1575        let table_watermarks = self.commit_pending_watermark();
1576        self.row_store
1577            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1578            .await?;
1579        self.epoch = Some(new_epoch);
1580
1581        self.on_post_commit = true;
1582        Ok(StateTablePostCommit { inner: self })
1583    }
1584
1585    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1586    fn commit_pending_watermark(
1587        &mut self,
1588    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1589        let watermark = self.pending_watermark.take()?;
1590        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1591
1592        assert!(
1593            !self.pk_indices().is_empty(),
1594            "see pending watermark on empty pk"
1595        );
1596        let (watermark_serializer, watermark_type) = self
1597            .watermark_serde
1598            .as_ref()
1599            .expect("watermark serde should be initialized to commit watermark");
1600
1601        let watermark_suffix =
1602            serialize_pk(row::once(Some(watermark.clone())), watermark_serializer);
1603        let vnode_watermark = VnodeWatermark::new(
1604            self.vnodes().clone(),
1605            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1606        );
1607
1608        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1609
1610        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1611        let direction = if order_type.is_ascending() {
1612            WatermarkDirection::Ascending
1613        } else {
1614            WatermarkDirection::Descending
1615        };
1616
1617        self.committed_watermark = Some(watermark);
1618        Some((direction, vec![vnode_watermark], *watermark_type))
1619    }
1620
1621    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1622        self.row_store.try_flush().await?;
1623        Ok(())
1624    }
1625}
1626
1627// Manually expand trait alias for better IDE experience.
1628pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1629impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1630
1631pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1632impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1633
1634pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1635impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1636
1637pub trait FromVnodeBytes {
1638    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1639}
1640
1641impl FromVnodeBytes for Bytes {
1642    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1643        prefix_slice_with_vnode(vnode, bytes)
1644    }
1645}
1646
1647impl FromVnodeBytes for () {
1648    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1649}
1650
1651// Iterator functions
1652impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1653where
1654    S: StateStore,
1655    SD: ValueRowSerde,
1656{
1657    /// This function scans rows from the relational table with specific `pk_range` under the same
1658    /// `vnode`.
1659    pub async fn iter_with_vnode(
1660        &self,
1661
1662        // Optional vnode that returns an iterator only over the given range under that vnode.
1663        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1664        // iterate over each vnode that the `StateTableInner` owns.
1665        vnode: VirtualNode,
1666        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1667        prefetch_options: PrefetchOptions,
1668    ) -> StreamExecutorResult<impl RowStream<'_>> {
1669        Ok(self
1670            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1671            .await?
1672            .map_ok(|(_, row)| row))
1673    }
1674
1675    pub async fn iter_keyed_row_with_vnode(
1676        &self,
1677        vnode: VirtualNode,
1678        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1679        prefetch_options: PrefetchOptions,
1680    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1681        Ok(self
1682            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1683            .await?
1684            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1685    }
1686
1687    pub async fn iter_with_vnode_and_output_indices(
1688        &self,
1689        vnode: VirtualNode,
1690        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1691        prefetch_options: PrefetchOptions,
1692    ) -> StreamExecutorResult<impl RowStream<'_>> {
1693        assert!(IS_REPLICATED);
1694        let stream = self
1695            .iter_with_vnode(vnode, pk_range, prefetch_options)
1696            .await?;
1697        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1698    }
1699}
1700
1701impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1702    // The lowest-level API.
1703    /// Middle-level APIs:
1704    /// - [`StateTableInner::iter_with_prefix_inner`]
1705    /// - [`StateTableInner::iter_kv_with_pk_range`]
1706    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1707        &self,
1708        vnode: VirtualNode,
1709        (start, end): (Bound<Bytes>, Bound<Bytes>),
1710        prefix_hint: Option<Bytes>,
1711        prefetch_options: PrefetchOptions,
1712    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1713        if let Some(m) = &self.metrics {
1714            m.iter_count.inc();
1715        }
1716        // Check if we can prune the entire range using vnode statistics
1717        let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1718            && let Some(vnode_stat) = stats.get(&vnode)
1719        {
1720            match vnode_stat.pruned_key_range(&start, &end) {
1721                Some((new_start, new_end)) => (new_start, new_end),
1722                None => {
1723                    if let Some(m) = &self.metrics {
1724                        m.iter_vnode_pruned_count.inc();
1725                    }
1726                    return Ok(futures::future::Either::Left(futures::stream::empty()));
1727                }
1728            }
1729        } else {
1730            (start, end)
1731        };
1732
1733        if let Some(rows) = &self.all_rows {
1734            return Ok(futures::future::Either::Right(
1735                futures::future::Either::Left(futures::stream::iter(
1736                    rows.get(&vnode)
1737                        .expect("covered vnode")
1738                        .range((pruned_start, pruned_end))
1739                        .map(move |(key, value)| {
1740                            Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1741                        }),
1742                )),
1743            ));
1744        }
1745        let read_options = ReadOptions {
1746            prefix_hint,
1747            retention_seconds: self.table_option.retention_seconds,
1748            prefetch_options,
1749            cache_policy: CachePolicy::Fill(Hint::Normal),
1750        };
1751
1752        Ok(futures::future::Either::Right(
1753            futures::future::Either::Right(deserialize_keyed_row_stream(
1754                self.state_store
1755                    .iter(
1756                        prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1757                        read_options,
1758                    )
1759                    .await?,
1760                &*self.row_serde,
1761            )),
1762        ))
1763    }
1764
1765    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1766        &self,
1767        vnode: VirtualNode,
1768        (start, end): (Bound<Bytes>, Bound<Bytes>),
1769        prefix_hint: Option<Bytes>,
1770        prefetch_options: PrefetchOptions,
1771    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1772        if let Some(m) = &self.metrics {
1773            m.iter_count.inc();
1774        }
1775        // Check if we can prune the entire range using vnode statistics
1776        let (pruned_start, pruned_end) = if let Some(stats) = &self.vnode_stats
1777            && let Some(vnode_stat) = stats.get(&vnode)
1778        {
1779            match vnode_stat.pruned_key_range(&start, &end) {
1780                Some((new_start, new_end)) => (new_start, new_end),
1781                None => {
1782                    if let Some(m) = &self.metrics {
1783                        m.iter_vnode_pruned_count.inc();
1784                    }
1785                    return Ok(futures::future::Either::Left(futures::stream::empty()));
1786                }
1787            }
1788        } else {
1789            (start, end)
1790        };
1791
1792        if let Some(rows) = &self.all_rows {
1793            return Ok(futures::future::Either::Right(
1794                futures::future::Either::Left(futures::stream::iter(
1795                    rows.get(&vnode)
1796                        .expect("covered vnode")
1797                        .range((pruned_start, pruned_end))
1798                        .rev()
1799                        .map(move |(key, value)| {
1800                            Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1801                        }),
1802                )),
1803            ));
1804        }
1805        let read_options = ReadOptions {
1806            prefix_hint,
1807            retention_seconds: self.table_option.retention_seconds,
1808            prefetch_options,
1809            cache_policy: CachePolicy::Fill(Hint::Normal),
1810        };
1811
1812        Ok(futures::future::Either::Right(
1813            futures::future::Either::Right(deserialize_keyed_row_stream(
1814                self.state_store
1815                    .rev_iter(
1816                        prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1817                        read_options,
1818                    )
1819                    .await?,
1820                &*self.row_serde,
1821            )),
1822        ))
1823    }
1824}
1825
1826impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1827where
1828    S: StateStore,
1829    SD: ValueRowSerde,
1830{
1831    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1832    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1833    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1834    pub async fn iter_with_prefix(
1835        &self,
1836        pk_prefix: impl Row,
1837        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1838        prefetch_options: PrefetchOptions,
1839    ) -> StreamExecutorResult<impl RowStream<'_>> {
1840        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1841            .await?;
1842        Ok(stream.map_ok(|(_, row)| row))
1843    }
1844
1845    /// Get the row from a state table with only 1 row.
1846    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1847        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1848        let stream = self
1849            .iter_with_prefix(row::empty(), sub_range, Default::default())
1850            .await?;
1851        pin_mut!(stream);
1852
1853        if let Some(res) = stream.next().await {
1854            let value = res?.into_owned_row();
1855            assert!(stream.next().await.is_none());
1856            Ok(Some(value))
1857        } else {
1858            Ok(None)
1859        }
1860    }
1861
1862    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1863    ///
1864    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1865    /// which does not matter in the use case.
1866    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1867        Ok(self
1868            .get_from_one_row_table()
1869            .await?
1870            .and_then(|row| row[0].clone()))
1871    }
1872
1873    pub async fn iter_keyed_row_with_prefix(
1874        &self,
1875        pk_prefix: impl Row,
1876        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1877        prefetch_options: PrefetchOptions,
1878    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1879        Ok(
1880            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1881                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1882        )
1883    }
1884
1885    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1886    pub async fn rev_iter_with_prefix(
1887        &self,
1888        pk_prefix: impl Row,
1889        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1890        prefetch_options: PrefetchOptions,
1891    ) -> StreamExecutorResult<impl RowStream<'_>> {
1892        Ok(
1893            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1894                .await?.map_ok(|(_, row)| row),
1895        )
1896    }
1897
1898    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1899        &self,
1900        pk_prefix: impl Row,
1901        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1902        prefetch_options: PrefetchOptions,
1903    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1904        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1905        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1906
1907        // We assume that all usages of iterating the state table only access a single vnode.
1908        // If this assertion fails, then something must be wrong with the operator implementation or
1909        // the distribution derivation from the optimizer.
1910        let vnode = self.compute_prefix_vnode(&pk_prefix);
1911
1912        // Construct prefix hint for prefix bloom filter.
1913        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1914        if self.prefix_hint_len != 0 {
1915            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1916        }
1917        let prefix_hint = {
1918            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1919                None
1920            } else {
1921                let encoded_prefix_len = self
1922                    .pk_serde
1923                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1924
1925                Some(Bytes::copy_from_slice(
1926                    &encoded_prefix[..encoded_prefix_len],
1927                ))
1928            }
1929        };
1930
1931        trace!(
1932            table_id = %self.table_id(),
1933            ?prefix_hint, ?pk_prefix,
1934            ?pk_prefix_indices,
1935            iter_direction = if REVERSE { "reverse" } else { "forward" },
1936            "storage_iter_with_prefix"
1937        );
1938
1939        let memcomparable_range =
1940            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1941
1942        Ok(if REVERSE {
1943            futures::future::Either::Left(
1944                self.row_store
1945                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1946                    .await?,
1947            )
1948        } else {
1949            futures::future::Either::Right(
1950                self.row_store
1951                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1952                    .await?,
1953            )
1954        })
1955    }
1956
1957    /// This function scans raw key-values from the relational table with specific `pk_range` under
1958    /// the same `vnode`.
1959    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1960        &'a self,
1961        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1962        // Optional vnode that returns an iterator only over the given range under that vnode.
1963        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1964        // iterate over each vnode that the `StateTableInner` owns.
1965        vnode: VirtualNode,
1966        prefetch_options: PrefetchOptions,
1967    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1968        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1969
1970        // TODO: provide a trace of useful params.
1971        self.row_store
1972            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1973            .await
1974    }
1975}
1976
1977fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1978    iter: impl StateStoreIter + 'a,
1979    deserializer: &'a impl ValueRowSerde,
1980) -> impl PkRowStream<'a, K> {
1981    iter.into_stream(move |(key, value)| {
1982        Ok((
1983            K::copy_from_slice(key.user_key.table_key.as_ref()),
1984            deserializer.deserialize(value).map(OwnedRow::new)?,
1985        ))
1986    })
1987    .map_err(Into::into)
1988}
1989
1990pub fn prefix_range_to_memcomparable(
1991    pk_serde: &OrderedRowSerde,
1992    range: &(Bound<impl Row>, Bound<impl Row>),
1993) -> (Bound<Bytes>, Bound<Bytes>) {
1994    (
1995        start_range_to_memcomparable(pk_serde, &range.0),
1996        end_range_to_memcomparable(pk_serde, &range.1, None),
1997    )
1998}
1999
2000fn prefix_and_sub_range_to_memcomparable(
2001    pk_serde: &OrderedRowSerde,
2002    sub_range: &(Bound<impl Row>, Bound<impl Row>),
2003    pk_prefix: impl Row,
2004) -> (Bound<Bytes>, Bound<Bytes>) {
2005    let (range_start, range_end) = sub_range;
2006    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2007    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2008    let start_range = match range_start {
2009        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2010        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2011        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2012    };
2013    let end_range = match range_end {
2014        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2015        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2016        Unbounded => Unbounded,
2017    };
2018    (
2019        start_range_to_memcomparable(pk_serde, &start_range),
2020        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2021    )
2022}
2023
2024fn start_range_to_memcomparable<R: Row>(
2025    pk_serde: &OrderedRowSerde,
2026    bound: &Bound<R>,
2027) -> Bound<Bytes> {
2028    let serialize_pk_prefix = |pk_prefix: &R| {
2029        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2030        serialize_pk(pk_prefix, &prefix_serializer)
2031    };
2032    match bound {
2033        Unbounded => Unbounded,
2034        Included(r) => {
2035            let serialized = serialize_pk_prefix(r);
2036
2037            Included(serialized)
2038        }
2039        Excluded(r) => {
2040            let serialized = serialize_pk_prefix(r);
2041
2042            start_bound_of_excluded_prefix(&serialized)
2043        }
2044    }
2045}
2046
2047fn end_range_to_memcomparable<R: Row>(
2048    pk_serde: &OrderedRowSerde,
2049    bound: &Bound<R>,
2050    serialized_pk_prefix: Option<Bytes>,
2051) -> Bound<Bytes> {
2052    let serialize_pk_prefix = |pk_prefix: &R| {
2053        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2054        serialize_pk(pk_prefix, &prefix_serializer)
2055    };
2056    match bound {
2057        Unbounded => match serialized_pk_prefix {
2058            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2059            None => Unbounded,
2060        },
2061        Included(r) => {
2062            let serialized = serialize_pk_prefix(r);
2063
2064            end_bound_of_prefix(&serialized)
2065        }
2066        Excluded(r) => {
2067            let serialized = serialize_pk_prefix(r);
2068            Excluded(serialized)
2069        }
2070    }
2071}
2072
2073fn fill_non_output_indices(
2074    i2o_mapping: &ColIndexMapping,
2075    data_types: &[DataType],
2076    chunk: StreamChunk,
2077) -> StreamChunk {
2078    let cardinality = chunk.cardinality();
2079    let (ops, columns, vis) = chunk.into_inner();
2080    let mut full_columns = Vec::with_capacity(data_types.len());
2081    for (i, data_type) in data_types.iter().enumerate() {
2082        if let Some(j) = i2o_mapping.try_map(i) {
2083            full_columns.push(columns[j].clone());
2084        } else {
2085            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2086            column_builder.append_n_null(cardinality);
2087            let column: ArrayRef = column_builder.finish().into();
2088            full_columns.push(column)
2089        }
2090    }
2091    let data_chunk = DataChunk::new(full_columns, vis);
2092    StreamChunk::from_parts(ops, data_chunk)
2093}
2094
2095#[cfg(test)]
2096mod tests {
2097    use std::fmt::Debug;
2098
2099    use expect_test::{Expect, expect};
2100
2101    use super::*;
2102
2103    fn check(actual: impl Debug, expect: Expect) {
2104        let actual = format!("{:#?}", actual);
2105        expect.assert_eq(&actual);
2106    }
2107
2108    #[test]
2109    fn test_fill_non_output_indices() {
2110        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2111        let replicated_chunk = [OwnedRow::new(vec![
2112            Some(222_i32.into()),
2113            Some(2_i32.into()),
2114        ])];
2115        let replicated_chunk = StreamChunk::from_parts(
2116            vec![Op::Insert],
2117            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2118        );
2119        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2120        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2121        check(
2122            filled_chunk,
2123            expect![[r#"
2124            StreamChunk { cardinality: 1, capacity: 1, data:
2125            +---+---+---+-----+
2126            | + | 2 |   | 222 |
2127            +---+---+---+-----+
2128             }"#]],
2129        );
2130    }
2131}