risingwave_stream/common/table/
state_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::row::{self, OwnedRow, Row, RowExt};
39use risingwave_common::types::{DataType, ScalarImpl};
40use risingwave_common::util::column_index_mapping::ColIndexMapping;
41use risingwave_common::util::epoch::EpochPair;
42use risingwave_common::util::row_serde::OrderedRowSerde;
43use risingwave_common::util::sort_util::OrderType;
44use risingwave_common::util::value_encoding::BasicSerde;
45use risingwave_hummock_sdk::HummockReadEpoch;
46use risingwave_hummock_sdk::key::{
47    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
48    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
49};
50use risingwave_hummock_sdk::table_watermark::{
51    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
52};
53use risingwave_pb::catalog::Table;
54use risingwave_storage::StateStore;
55use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
56use risingwave_storage::hummock::CachePolicy;
57use risingwave_storage::mem_table::MemTableError;
58use risingwave_storage::row_serde::find_columns_by_ids;
59use risingwave_storage::row_serde::row_serde_util::{
60    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
61};
62use risingwave_storage::row_serde::value_serde::ValueRowSerde;
63use risingwave_storage::store::*;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::keyed_cache_may_stale;
69use crate::executor::monitor::streaming_stats::StateTableMetrics;
70use crate::executor::{StreamExecutorError, StreamExecutorResult};
71
72/// This macro is used to mark a point where we want to randomly discard the operation and early
73/// return, only in insane mode.
74macro_rules! insane_mode_discard_point {
75    () => {{
76        use rand::Rng;
77        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
78            return;
79        }
80    }};
81}
82
83/// Per-vnode statistics for pruning. None means this stat is not maintained.
84/// For each vnode, we maintain the min and max storage table key (excluding the vnode part) observed in the vnode.
85/// The stat won't differentiate between tombstone and normal keys.
86struct VnodeStatistics {
87    min_key: Option<Bytes>,
88    max_key: Option<Bytes>,
89}
90
91impl VnodeStatistics {
92    fn new() -> Self {
93        Self {
94            min_key: None,
95            max_key: None,
96        }
97    }
98
99    fn update_with_key(&mut self, key: &Bytes) {
100        if let Some(min) = &self.min_key {
101            if key < min {
102                self.min_key = Some(key.clone());
103            }
104        } else {
105            self.min_key = Some(key.clone());
106        }
107
108        if let Some(max) = &self.max_key {
109            if key > max {
110                self.max_key = Some(key.clone());
111            }
112        } else {
113            self.max_key = Some(key.clone());
114        }
115    }
116
117    fn can_prune(&self, key: &Bytes) -> bool {
118        if let Some(min) = &self.min_key
119            && key < min
120        {
121            return true;
122        }
123        if let Some(max) = &self.max_key
124            && key > max
125        {
126            return true;
127        }
128        false
129    }
130
131    fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
132        // Check if the range is completely outside vnode bounds
133        if let Some(max) = &self.max_key {
134            match start {
135                Included(s) if s > max => return true,
136                Excluded(s) if s >= max => return true,
137                _ => {}
138            }
139        }
140        if let Some(min) = &self.min_key {
141            match end {
142                Included(e) if e < min => return true,
143                Excluded(e) if e <= min => return true,
144                _ => {}
145            }
146        }
147        false
148    }
149
150    fn pruned_key_range(
151        &self,
152        start: &Bound<Bytes>,
153        end: &Bound<Bytes>,
154    ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
155        if self.can_prune_range(start, end) {
156            return None;
157        }
158        let new_start = if let Some(min) = &self.min_key {
159            match start {
160                Included(s) if s <= min => Included(min.clone()),
161                Excluded(s) if s < min => Included(min.clone()),
162                _ => start.clone(),
163            }
164        } else {
165            start.clone()
166        };
167
168        let new_end = if let Some(max) = &self.max_key {
169            match end {
170                Included(e) if e >= max => Included(max.clone()),
171                Excluded(e) if e > max => Included(max.clone()),
172                _ => end.clone(),
173            }
174        } else {
175            end.clone()
176        };
177
178        Some((new_start, new_end))
179    }
180}
181
182/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
183/// row-based encoding.
184pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
185where
186    S: StateStore,
187    SD: ValueRowSerde,
188{
189    /// Id for this table.
190    table_id: TableId,
191
192    /// State store backend.
193    row_store: StateTableRowStore<S::Local, SD>,
194
195    /// State store for accessing snapshot data
196    store: S,
197
198    /// Current epoch
199    epoch: Option<EpochPair>,
200
201    /// Used for serializing and deserializing the primary key.
202    pk_serde: OrderedRowSerde,
203
204    /// Indices of primary key.
205    /// Note that the index is based on the all columns of the table, instead of the output ones.
206    // FIXME: revisit constructions and usages.
207    pk_indices: Vec<usize>,
208
209    /// Distribution of the state table.
210    ///
211    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
212    /// executor. The table will also check whether the written rows
213    /// conform to this partition.
214    distribution: TableDistribution,
215
216    prefix_hint_len: usize,
217
218    value_indices: Option<Vec<usize>>,
219
220    /// The index of the watermark column used for state cleaning in all columns.
221    pub clean_watermark_index: Option<usize>,
222    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
223    pending_watermark: Option<ScalarImpl>,
224    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
225    committed_watermark: Option<ScalarImpl>,
226    /// Serializer and serde type for the watermark column.
227    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
228
229    /// Data Types
230    /// We will need to use to build data chunks from state table rows.
231    data_types: Vec<DataType>,
232
233    /// "i" here refers to the base `state_table`'s actual schema.
234    /// "o" here refers to the replicated state table's output schema.
235    /// This mapping is used to reconstruct a row being written from replicated state table.
236    /// Such that the schema of this row will match the full schema of the base state table.
237    /// It is only applicable for replication.
238    i2o_mapping: ColIndexMapping,
239
240    /// Output indices
241    /// Used for:
242    /// 1. Computing `output_value_indices` to ser/de replicated rows.
243    /// 2. Computing output pk indices to used them for backfill state.
244    pub output_indices: Vec<usize>,
245
246    op_consistency_level: StateTableOpConsistencyLevel,
247
248    /// Flag to indicate whether the state table has called `commit`, but has not called
249    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
250    on_post_commit: bool,
251}
252
253/// `StateTable` will use `BasicSerde` as default
254pub type StateTable<S> = StateTableInner<S, BasicSerde>;
255/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
256/// Used for `ArrangementBackfill` executor.
257pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
258
259// initialize
260impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
261where
262    S: StateStore,
263    SD: ValueRowSerde,
264{
265    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
266    /// and otherwise, deadlock can be likely to happen.
267    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
268        self.row_store
269            .init(epoch, self.distribution.vnodes())
270            .await?;
271        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
272        Ok(())
273    }
274
275    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
276        self.store
277            .try_wait_epoch(
278                HummockReadEpoch::Committed(prev_epoch),
279                TryWaitEpochOptions {
280                    table_id: self.table_id,
281                },
282            )
283            .await
284    }
285
286    pub fn state_store(&self) -> &S {
287        &self.store
288    }
289}
290
291fn consistent_old_value_op(
292    row_serde: Arc<impl ValueRowSerde>,
293    is_log_store: bool,
294) -> OpConsistencyLevel {
295    OpConsistencyLevel::ConsistentOldValue {
296        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
297            if first == second {
298                return true;
299            }
300            let first = match row_serde.deserialize(first) {
301                Ok(rows) => rows,
302                Err(e) => {
303                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
304                    return false;
305                }
306            };
307            let second = match row_serde.deserialize(second) {
308                Ok(rows) => rows,
309                Err(e) => {
310                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
311                    return false;
312                }
313            };
314            if first != second {
315                error!(first = ?first, second = ?second, "sanity check fail");
316                false
317            } else {
318                true
319            }
320        }),
321        is_log_store,
322    }
323}
324
325macro_rules! dispatch_value_indices {
326    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
327        if let Some(value_indices) = $value_indices {
328            $(
329                let $row_var_name = $row_var_name.project(value_indices);
330            )+
331            $body
332        } else {
333            $body
334        }
335    };
336}
337
338/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
339/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
340/// provides method to read (`get` and `iter`) over serialized key (or key range)
341/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
342struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
343    state_store: LS,
344    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
345
346    table_id: TableId,
347    row_serde: Arc<SD>,
348    // should be only used for debugging in panic message of handle_mem_table_error
349    pk_serde: OrderedRowSerde,
350
351    // Per-vnode min/max key statistics for pruning
352    vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
353    /// When false, vnode stats pruning is in dry-run mode:
354    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
355    /// Reads still go to cache/storage even when pruning would indicate no results.
356    enable_state_table_vnode_stats_pruning: bool,
357    // Optional metrics for state table operations
358    pub metrics: Option<StateTableMetrics>,
359}
360
361impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
362    async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
363        if self.vnode_stats.is_none() {
364            return Ok(());
365        }
366
367        // vnode stats must be disabled when all rows are preloaded
368        assert!(self.all_rows.is_none());
369
370        let start_time = Instant::now();
371        let mut stats_map = HashMap::new();
372
373        // Scan each vnode to get min/max keys
374        for vnode in vnode_bitmap.iter_vnodes() {
375            let mut stats = VnodeStatistics::new();
376
377            // Get min key via forward iteration
378            let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
379            let read_options = ReadOptions {
380                cache_policy: CachePolicy::Fill(Hint::Low),
381                ..Default::default()
382            };
383
384            let mut iter = self
385                .state_store
386                .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
387                .await?;
388            if let Some(item) = iter.try_next().await? {
389                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
390                assert_eq!(vnode, key_vnode);
391                stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
392            }
393
394            // Get max key via reverse iteration
395            let mut rev_iter = self
396                .state_store
397                .rev_iter(memcomparable_range_with_vnode, read_options)
398                .await?;
399            if let Some(item) = rev_iter.try_next().await? {
400                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
401                assert_eq!(vnode, key_vnode);
402                stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
403            }
404
405            stats_map.insert(vnode, stats);
406        }
407
408        self.vnode_stats = Some(stats_map);
409
410        // avoid flooding e2e-test log
411        if !cfg!(debug_assertions) {
412            info!(
413                table_id = %self.table_id,
414                vnode_count = vnode_bitmap.count_ones(),
415                duration = ?start_time.elapsed(),
416                "finished initializing vnode statistics"
417            );
418        }
419
420        Ok(())
421    }
422
423    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
424        if let Some(rows) = &mut self.all_rows {
425            rows.clear();
426            let start_time = Instant::now();
427            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
428                let state_store = &self.state_store;
429                let row_serde = &self.row_serde;
430                async move {
431                    let mut rows = BTreeMap::new();
432                    let memcomparable_range_with_vnode =
433                        prefixed_range_with_vnode::<Bytes>(.., vnode);
434                    // TODO: set read options
435                    let stream = deserialize_keyed_row_stream::<Bytes>(
436                        state_store
437                            .iter(
438                                memcomparable_range_with_vnode,
439                                ReadOptions {
440                                    prefix_hint: None,
441                                    prefetch_options: Default::default(),
442                                    cache_policy: Default::default(),
443                                },
444                            )
445                            .await?,
446                        &**row_serde,
447                    );
448                    pin_mut!(stream);
449                    while let Some((encoded_key, row)) = stream.try_next().await? {
450                        let key = TableKey(encoded_key);
451                        let (iter_vnode, key) = key.split_vnode_bytes();
452                        assert_eq!(vnode, iter_vnode);
453                        rows.try_insert(key, row).expect("non-duplicated");
454                    }
455                    Ok((vnode, rows)) as StreamExecutorResult<_>
456                }
457            }))
458            .await?
459            .into_iter()
460            .collect();
461            // avoid flooding e2e-test log
462            if !cfg!(debug_assertions) {
463                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
464            }
465        }
466        Ok(())
467    }
468
469    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
470        self.state_store.init(InitOptions::new(epoch)).await?;
471        self.may_reload_all_rows(vnode_bitmap).await?;
472        self.may_load_vnode_stats(vnode_bitmap).await
473    }
474
475    async fn update_vnode_bitmap(
476        &mut self,
477        vnodes: Arc<Bitmap>,
478    ) -> StreamExecutorResult<Arc<Bitmap>> {
479        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
480        self.may_reload_all_rows(&vnodes).await?;
481        self.may_load_vnode_stats(&vnodes).await?;
482
483        Ok(prev_vnodes)
484    }
485
486    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
487        self.state_store.try_flush().await?;
488        Ok(())
489    }
490
491    async fn seal_current_epoch(
492        &mut self,
493        next_epoch: u64,
494        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
495        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
496    ) -> StreamExecutorResult<()> {
497        if let Some((direction, watermarks, serde_type)) = &table_watermarks
498            && let Some(rows) = &mut self.all_rows
499        {
500            match serde_type {
501                WatermarkSerdeType::PkPrefix => {
502                    for vnode_watermark in watermarks {
503                        match direction {
504                            WatermarkDirection::Ascending => {
505                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
506                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
507                                    // split_off returns everything after the given key, including the key.
508                                    *rows = rows.split_off(vnode_watermark.watermark());
509                                }
510                            }
511                            WatermarkDirection::Descending => {
512                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
513                                let split_off_key = next_key(vnode_watermark.watermark());
514                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
515                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
516                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
517                                    // (..Include(vnode_watermark.watermark()))
518                                    rows.split_off(split_off_key.as_slice());
519                                }
520                            }
521                        }
522                    }
523                }
524                WatermarkSerdeType::NonPkPrefix => {
525                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
526                    self.all_rows = None;
527                }
528                WatermarkSerdeType::Value => {
529                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
530                    self.all_rows = None;
531                }
532            }
533        }
534        self.state_store
535            .flush()
536            .instrument(tracing::info_span!("state_table_flush"))
537            .await?;
538        let switch_op_consistency_level =
539            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
540                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
541                StateTableOpConsistencyLevel::ConsistentOldValue => {
542                    consistent_old_value_op(self.row_serde.clone(), false)
543                }
544                StateTableOpConsistencyLevel::LogStoreEnabled => {
545                    consistent_old_value_op(self.row_serde.clone(), true)
546                }
547            });
548        self.state_store.seal_current_epoch(
549            next_epoch,
550            SealCurrentEpochOptions {
551                table_watermarks,
552                switch_op_consistency_level,
553            },
554        );
555        Ok(())
556    }
557}
558
559#[derive(Eq, PartialEq, Copy, Clone, Debug)]
560pub enum StateTableOpConsistencyLevel {
561    /// Op is inconsistent
562    Inconsistent,
563    /// Op is consistent.
564    /// - Insert op should ensure that the key does not exist previously
565    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
566    ConsistentOldValue,
567    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
568    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
569    LogStoreEnabled,
570}
571
572pub struct StateTableBuilder<'a, S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
573    table_catalog: &'a Table,
574    store: S,
575    vnodes: Option<Arc<Bitmap>>,
576    op_consistency_level: Option<StateTableOpConsistencyLevel>,
577    output_column_ids: Option<Vec<ColumnId>>,
578    preload_all_rows: PreloadAllRow,
579    enable_vnode_key_stats: Option<bool>,
580    /// When false, vnode stats pruning is in dry-run mode:
581    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
582    enable_state_table_vnode_stats_pruning: bool,
583    metrics: Option<StateTableMetrics>,
584
585    _serde: PhantomData<SD>,
586}
587
588impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
589    StateTableBuilder<'a, S, SD, IS_REPLICATED, ()>
590{
591    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
592        Self {
593            table_catalog,
594            store,
595            vnodes,
596            op_consistency_level: None,
597            output_column_ids: None,
598            preload_all_rows: (),
599            enable_vnode_key_stats: None,
600            enable_state_table_vnode_stats_pruning: false,
601            metrics: None,
602            _serde: Default::default(),
603        }
604    }
605
606    fn with_preload_all_rows(
607        self,
608        preload_all_rows: bool,
609    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
610        StateTableBuilder {
611            table_catalog: self.table_catalog,
612            store: self.store,
613            vnodes: self.vnodes,
614            op_consistency_level: self.op_consistency_level,
615            output_column_ids: self.output_column_ids,
616            preload_all_rows,
617            enable_vnode_key_stats: self.enable_vnode_key_stats,
618            enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
619            metrics: self.metrics,
620            _serde: Default::default(),
621        }
622    }
623
624    pub fn enable_preload_all_rows_by_config(
625        self,
626        config: &StreamingConfig,
627    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
628        let developer = &config.developer;
629        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
630            !developer
631                .mem_preload_state_table_ids_blacklist
632                .contains(&self.table_catalog.id.as_raw_id())
633        } else {
634            developer
635                .mem_preload_state_table_ids_whitelist
636                .contains(&self.table_catalog.id.as_raw_id())
637        };
638        self.with_preload_all_rows(preload_all_rows)
639    }
640
641    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, bool> {
642        self.with_preload_all_rows(false)
643    }
644}
645
646impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
647    StateTableBuilder<'a, S, SD, IS_REPLICATED, PreloadAllRow>
648{
649    pub fn with_op_consistency_level(
650        mut self,
651        op_consistency_level: StateTableOpConsistencyLevel,
652    ) -> Self {
653        self.op_consistency_level = Some(op_consistency_level);
654        self
655    }
656
657    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
658        self.output_column_ids = Some(output_column_ids);
659        self
660    }
661
662    pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
663        self.enable_vnode_key_stats = Some(enable);
664        self.enable_state_table_vnode_stats_pruning =
665            enable && config.developer.enable_state_table_vnode_stats_pruning;
666        self
667    }
668
669    pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
670        self.metrics = Some(metrics);
671        self
672    }
673}
674
675impl<'a, S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
676    StateTableBuilder<'a, S, SD, IS_REPLICATED, bool>
677{
678    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
679        let mut preload_all_rows = self.preload_all_rows;
680        if preload_all_rows
681            && let Err(e) =
682                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
683        {
684            warn!(table_id=%self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
685            preload_all_rows = false;
686        }
687
688        let should_enable_vnode_key_stats = if preload_all_rows
689            && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
690            && enable_vnode_key_stats
691        {
692            false
693        } else {
694            self.enable_vnode_key_stats.unwrap_or(false)
695        };
696
697        StateTableInner::from_table_catalog_inner(
698            self.table_catalog,
699            self.store,
700            self.vnodes,
701            self.op_consistency_level
702                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
703            self.output_column_ids.unwrap_or_default(),
704            preload_all_rows,
705            should_enable_vnode_key_stats,
706            self.enable_state_table_vnode_stats_pruning,
707            self.metrics,
708        )
709        .await
710    }
711}
712
713// initialize
714// FIXME(kwannoel): Enforce that none of the constructors here
715// should be used by replicated state table.
716// Apart from from_table_catalog_inner.
717impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
718where
719    S: StateStore,
720    SD: ValueRowSerde,
721{
722    /// Create state table from table catalog and store.
723    ///
724    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
725    #[cfg(any(test, feature = "test"))]
726    pub async fn from_table_catalog(
727        table_catalog: &Table,
728        store: S,
729        vnodes: Option<Arc<Bitmap>>,
730    ) -> Self {
731        StateTableBuilder::new(table_catalog, store, vnodes)
732            .forbid_preload_all_rows()
733            .build()
734            .await
735    }
736
737    /// Create state table from table catalog and store with sanity check disabled.
738    pub async fn from_table_catalog_inconsistent_op(
739        table_catalog: &Table,
740        store: S,
741        vnodes: Option<Arc<Bitmap>>,
742    ) -> Self {
743        StateTableBuilder::new(table_catalog, store, vnodes)
744            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
745            .forbid_preload_all_rows()
746            .build()
747            .await
748    }
749
750    /// Create state table from table catalog and store.
751    #[allow(clippy::too_many_arguments)]
752    async fn from_table_catalog_inner(
753        table_catalog: &Table,
754        store: S,
755        vnodes: Option<Arc<Bitmap>>,
756        op_consistency_level: StateTableOpConsistencyLevel,
757        output_column_ids: Vec<ColumnId>,
758        preload_all_rows: bool,
759        enable_vnode_key_stats: bool,
760        enable_state_table_vnode_stats_pruning: bool,
761        metrics: Option<StateTableMetrics>,
762    ) -> Self {
763        let table_id = table_catalog.id;
764        let table_columns: Vec<ColumnDesc> = table_catalog
765            .columns
766            .iter()
767            .map(|col| col.column_desc.as_ref().unwrap().into())
768            .collect();
769        let data_types: Vec<DataType> = table_catalog
770            .columns
771            .iter()
772            .map(|col| {
773                col.get_column_desc()
774                    .unwrap()
775                    .get_column_type()
776                    .unwrap()
777                    .into()
778            })
779            .collect();
780        let order_types: Vec<OrderType> = table_catalog
781            .pk
782            .iter()
783            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
784            .collect();
785        let dist_key_indices: Vec<usize> = table_catalog
786            .distribution_key
787            .iter()
788            .map(|dist_index| *dist_index as usize)
789            .collect();
790
791        let pk_indices = table_catalog
792            .pk
793            .iter()
794            .map(|col_order| col_order.column_index as usize)
795            .collect_vec();
796
797        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
798        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
799            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
800        } else {
801            table_catalog
802                .get_dist_key_in_pk()
803                .iter()
804                .map(|idx| *idx as usize)
805                .collect()
806        };
807
808        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
809            let vnode_col_idx = *idx as usize;
810            pk_indices.iter().position(|&i| vnode_col_idx == i)
811        });
812
813        let distribution =
814            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
815        assert_eq!(
816            distribution.vnode_count(),
817            table_catalog.vnode_count(),
818            "vnode count mismatch, scanning table {} under wrong distribution?",
819            table_catalog.name,
820        );
821
822        let pk_data_types = pk_indices
823            .iter()
824            .map(|i| table_columns[*i].data_type.clone())
825            .collect();
826        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
827
828        let input_value_indices = table_catalog
829            .value_indices
830            .iter()
831            .map(|val| *val as usize)
832            .collect_vec();
833
834        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
835
836        // if value_indices is the no shuffle full columns.
837        let value_indices = match input_value_indices.len() == table_columns.len()
838            && input_value_indices == no_shuffle_value_indices
839        {
840            true => None,
841            false => Some(input_value_indices),
842        };
843        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
844
845        let row_serde = Arc::new(SD::new(
846            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
847            Arc::from(table_columns.clone().into_boxed_slice()),
848        ));
849
850        let state_table_op_consistency_level = op_consistency_level;
851        let op_consistency_level = match op_consistency_level {
852            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
853            StateTableOpConsistencyLevel::ConsistentOldValue => {
854                consistent_old_value_op(row_serde.clone(), false)
855            }
856            StateTableOpConsistencyLevel::LogStoreEnabled => {
857                consistent_old_value_op(row_serde.clone(), true)
858            }
859        };
860
861        let table_option = TableOption::new(table_catalog.retention_seconds);
862        let new_local_options = if IS_REPLICATED {
863            NewLocalOptions::new_replicated(
864                table_id,
865                table_catalog.fragment_id,
866                op_consistency_level,
867                table_option,
868                distribution.vnodes().clone(),
869            )
870        } else {
871            NewLocalOptions::new(
872                table_id,
873                table_catalog.fragment_id,
874                op_consistency_level,
875                table_option,
876                distribution.vnodes().clone(),
877                true,
878            )
879        };
880        let local_state_store = store.new_local(new_local_options).await;
881
882        // If state table has versioning, that means it supports
883        // Schema change. In that case, the row encoding should be column aware as well.
884        // Otherwise both will be false.
885        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
886        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
887        assert_eq!(
888            table_catalog.version.is_some(),
889            row_serde.kind().is_column_aware()
890        );
891
892        // Get info for replicated state table.
893        let output_column_ids_to_input_idx = output_column_ids
894            .iter()
895            .enumerate()
896            .map(|(pos, id)| (*id, pos))
897            .collect::<HashMap<_, _>>();
898
899        // Compute column descriptions
900        let columns: Vec<ColumnDesc> = table_catalog
901            .columns
902            .iter()
903            .map(|c| c.column_desc.as_ref().unwrap().into())
904            .collect_vec();
905
906        // Compute i2o mapping
907        // Note that this can be a partial mapping, since we use the i2o mapping to get
908        // any 1 of the output columns, and use that to fill the input column.
909        let mut i2o_mapping = vec![None; columns.len()];
910        for (i, column) in columns.iter().enumerate() {
911            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
912                i2o_mapping[i] = Some(*pos);
913            }
914        }
915        // We can prune any duplicate column indices
916        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
917
918        // Compute output indices
919        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
920        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
921        if clean_watermark_indices.len() > 1 {
922            unimplemented!("multiple clean watermark columns are not supported yet")
923        }
924        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
925        let watermark_serde = clean_watermark_index.map(|idx| {
926            let pk_idx = pk_indices.iter().position(|&i| i == idx);
927            let (watermark_serde, watermark_serde_type) = match pk_idx {
928                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
929                Some(pk_idx) => (
930                    pk_serde.index(pk_idx).into_owned(),
931                    WatermarkSerdeType::NonPkPrefix,
932                ),
933                None => (
934                    OrderedRowSerde::new(
935                        vec![data_types[idx].clone()],
936                        vec![OrderType::ascending()],
937                    ),
938                    WatermarkSerdeType::Value,
939                ),
940            };
941            (watermark_serde, watermark_serde_type)
942        });
943
944        // Restore persisted table watermark.
945        let max_watermark_of_vnodes = distribution
946            .vnodes()
947            .iter_vnodes()
948            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
949            .max();
950        let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref()
951            && let Some(max_watermark) = max_watermark_of_vnodes
952        {
953            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
954                assert!(row.len() == 1);
955                row[0].clone()
956            });
957            if deserialized.is_none() {
958                tracing::error!(
959                    vnodes = ?distribution.vnodes(),
960                    watermark = ?max_watermark,
961                    "Failed to deserialize persisted watermark from state store.",
962                );
963            }
964            deserialized
965        } else {
966            None
967        };
968
969        Self {
970            table_id,
971            row_store: StateTableRowStore {
972                all_rows: preload_all_rows.then(HashMap::new),
973                state_store: local_state_store,
974                row_serde,
975                pk_serde: pk_serde.clone(),
976                table_id,
977                // Need to maintain vnode min/max key stats when vnode key pruning is enabled
978                vnode_stats: enable_vnode_key_stats.then(HashMap::new),
979                enable_state_table_vnode_stats_pruning,
980                metrics,
981            },
982            store,
983            epoch: None,
984            pk_serde,
985            pk_indices,
986            distribution,
987            prefix_hint_len,
988            value_indices,
989            pending_watermark: None,
990            committed_watermark,
991            watermark_serde,
992            data_types,
993            output_indices,
994            i2o_mapping,
995            op_consistency_level: state_table_op_consistency_level,
996            clean_watermark_index,
997            on_post_commit: false,
998        }
999    }
1000
1001    pub fn get_data_types(&self) -> &[DataType] {
1002        &self.data_types
1003    }
1004
1005    pub fn table_id(&self) -> TableId {
1006        self.table_id
1007    }
1008
1009    /// Get the vnode value with given (prefix of) primary key
1010    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1011        self.distribution
1012            .try_compute_vnode_by_pk_prefix(pk_prefix)
1013            .expect("For streaming, the given prefix must be enough to calculate the vnode")
1014    }
1015
1016    /// Get the vnode value of the given primary key
1017    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1018        self.distribution.compute_vnode_by_pk(pk)
1019    }
1020
1021    /// NOTE(kwannoel): This is used by backfill.
1022    /// We want to check pk indices of upstream table.
1023    pub fn pk_indices(&self) -> &[usize] {
1024        &self.pk_indices
1025    }
1026
1027    /// Get the indices of the primary key columns in the output columns.
1028    ///
1029    /// Returns `None` if any of the primary key columns is not in the output columns.
1030    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1031        assert!(IS_REPLICATED);
1032        self.pk_indices
1033            .iter()
1034            .map(|&i| self.output_indices.iter().position(|&j| i == j))
1035            .collect()
1036    }
1037
1038    pub fn pk_serde(&self) -> &OrderedRowSerde {
1039        &self.pk_serde
1040    }
1041
1042    pub fn vnodes(&self) -> &Arc<Bitmap> {
1043        self.distribution.vnodes()
1044    }
1045
1046    pub fn value_indices(&self) -> &Option<Vec<usize>> {
1047        &self.value_indices
1048    }
1049
1050    pub fn is_consistent_op(&self) -> bool {
1051        matches!(
1052            self.op_consistency_level,
1053            StateTableOpConsistencyLevel::ConsistentOldValue
1054                | StateTableOpConsistencyLevel::LogStoreEnabled
1055        )
1056    }
1057
1058    pub fn metrics(&self) -> Option<&StateTableMetrics> {
1059        self.row_store.metrics.as_ref()
1060    }
1061}
1062
1063impl<S, SD> StateTableInner<S, SD, true>
1064where
1065    S: StateStore,
1066    SD: ValueRowSerde,
1067{
1068    /// Create replicated state table from table catalog with output indices
1069    pub async fn new_replicated(
1070        table_catalog: &Table,
1071        store: S,
1072        vnodes: Option<Arc<Bitmap>>,
1073        output_column_ids: Vec<ColumnId>,
1074    ) -> Self {
1075        // TODO: can it be ConsistentOldValue?
1076        // TODO: may enable preload_all_rows
1077        StateTableBuilder::new(table_catalog, store, vnodes)
1078            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1079            .with_output_column_ids(output_column_ids)
1080            .forbid_preload_all_rows()
1081            .build()
1082            .await
1083    }
1084}
1085
1086// point get
1087impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1088where
1089    S: StateStore,
1090    SD: ValueRowSerde,
1091{
1092    /// Get a single row from state table.
1093    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1094        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1095        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1096        match row {
1097            Some(row) => {
1098                if IS_REPLICATED {
1099                    // If the table is replicated, we need to deserialize the row with the output
1100                    // indices.
1101                    let row = row.project(&self.output_indices);
1102                    Ok(Some(row.into_owned_row()))
1103                } else {
1104                    Ok(Some(row))
1105                }
1106            }
1107            None => Ok(None),
1108        }
1109    }
1110
1111    /// Get a raw encoded row from state table.
1112    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1113        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1114        self.row_store.exists(serialized_pk, prefix_hint).await
1115    }
1116
1117    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1118        assert!(pk.len() <= self.pk_indices.len());
1119        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1120    }
1121
1122    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1123        let serialized_pk = self.serialize_pk(&pk);
1124        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
1125            Some(serialized_pk.slice(VirtualNode::SIZE..))
1126        } else {
1127            #[cfg(debug_assertions)]
1128            if self.prefix_hint_len != 0 {
1129                warn!(
1130                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1131                );
1132            }
1133            None
1134        };
1135        (serialized_pk, prefix_hint)
1136    }
1137}
1138
1139impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1140    async fn get(
1141        &self,
1142        key_bytes: TableKey<Bytes>,
1143        prefix_hint: Option<Bytes>,
1144    ) -> StreamExecutorResult<Option<OwnedRow>> {
1145        if let Some(m) = &self.metrics {
1146            m.get_count.inc();
1147        }
1148        if let Some(rows) = &self.all_rows {
1149            let (vnode, key) = key_bytes.split_vnode_bytes();
1150            return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1151        }
1152
1153        // Try to prune using vnode statistics
1154        let should_prune = if let Some(stats) = &self.vnode_stats
1155            && let (vnode, key) = key_bytes.split_vnode_bytes()
1156            && let Some(vnode_stat) = stats.get(&vnode)
1157            && vnode_stat.can_prune(&key)
1158        {
1159            if let Some(m) = &self.metrics {
1160                m.get_vnode_pruned_count.inc();
1161            }
1162            true
1163        } else {
1164            false
1165        };
1166
1167        if should_prune && self.enable_state_table_vnode_stats_pruning {
1168            return Ok(None);
1169        }
1170
1171        let read_options = ReadOptions {
1172            prefix_hint,
1173            cache_policy: CachePolicy::Fill(Hint::Normal),
1174            ..Default::default()
1175        };
1176
1177        let result = self
1178            .state_store
1179            .on_key_value(key_bytes, read_options, move |_, value| {
1180                let row = self.row_serde.deserialize(value)?;
1181                Ok(OwnedRow::new(row))
1182            })
1183            .await
1184            .map_err(Into::<StreamExecutorError>::into)?;
1185
1186        // In dry-run mode, verify that pruning would have been correct
1187        if should_prune && result.is_some() {
1188            tracing::warn!(
1189                table_id = %self.table_id,
1190                "vnode stats pruning dry run fails for get. This will not affect correctness."
1191            );
1192        }
1193
1194        Ok(result)
1195    }
1196
1197    async fn exists(
1198        &self,
1199        key_bytes: TableKey<Bytes>,
1200        prefix_hint: Option<Bytes>,
1201    ) -> StreamExecutorResult<bool> {
1202        if let Some(m) = &self.metrics {
1203            m.get_count.inc();
1204        }
1205        if let Some(rows) = &self.all_rows {
1206            let (vnode, key) = key_bytes.split_vnode_bytes();
1207            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1208        }
1209
1210        // Try to prune using vnode statistics
1211        let should_prune = if let Some(stats) = &self.vnode_stats
1212            && let (vnode, key) = key_bytes.split_vnode_bytes()
1213            && let Some(vnode_stat) = stats.get(&vnode)
1214            && vnode_stat.can_prune(&key)
1215        {
1216            if let Some(m) = &self.metrics {
1217                m.get_vnode_pruned_count.inc();
1218            }
1219            true
1220        } else {
1221            false
1222        };
1223
1224        if should_prune && self.enable_state_table_vnode_stats_pruning {
1225            return Ok(false);
1226        }
1227
1228        let read_options = ReadOptions {
1229            prefix_hint,
1230            cache_policy: CachePolicy::Fill(Hint::Normal),
1231            ..Default::default()
1232        };
1233        let result = self
1234            .state_store
1235            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1236            .await?;
1237        let exists = result.is_some();
1238
1239        // In dry-run mode, verify that pruning would have been correct
1240        if should_prune && exists {
1241            tracing::warn!(
1242                table_id = %self.table_id,
1243                "vnode stats pruning dry run fails for exists. This will not affect correctness."
1244            );
1245        }
1246
1247        Ok(exists)
1248    }
1249}
1250
1251/// A callback struct returned from [`StateTableInner::commit`].
1252///
1253/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1254/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1255/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1256///
1257/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1258/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1259/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1260/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1261/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1262/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1263/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1264/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1265#[must_use]
1266pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1267where
1268    S: StateStore,
1269    SD: ValueRowSerde,
1270{
1271    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1272}
1273
1274impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1275where
1276    S: StateStore,
1277    SD: ValueRowSerde,
1278{
1279    /// Returns `Some((new_vnodes, old_vnodes, state_table), keyed_cache_may_stale)` if the vnode bitmap is updated.
1280    ///
1281    /// Note the `keyed_cache_may_stale` only applies to keyed cache. If the executor's cache is not keyed, but will
1282    /// be consumed with all vnodes it owns, the executor may need to ALWAYS clear the cache regardless of this flag.
1283    pub async fn post_yield_barrier(
1284        mut self,
1285        new_vnodes: Option<Arc<Bitmap>>,
1286    ) -> StreamExecutorResult<
1287        Option<(
1288            (
1289                Arc<Bitmap>,
1290                Arc<Bitmap>,
1291                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1292            ),
1293            bool,
1294        )>,
1295    > {
1296        self.inner.on_post_commit = false;
1297        Ok(if let Some(new_vnodes) = new_vnodes {
1298            let (old_vnodes, keyed_cache_may_stale) =
1299                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1300            Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1301        } else {
1302            None
1303        })
1304    }
1305
1306    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1307        &*self.inner
1308    }
1309
1310    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1311    async fn update_vnode_bitmap(
1312        &mut self,
1313        new_vnodes: Arc<Bitmap>,
1314    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1315        let prev_vnodes = self
1316            .inner
1317            .row_store
1318            .update_vnode_bitmap(new_vnodes.clone())
1319            .await?;
1320        assert_eq!(
1321            &prev_vnodes,
1322            self.inner.vnodes(),
1323            "state table and state store vnode bitmap mismatches"
1324        );
1325
1326        if self.inner.distribution.is_singleton() {
1327            assert_eq!(
1328                &new_vnodes,
1329                self.inner.vnodes(),
1330                "should not update vnode bitmap for singleton table"
1331            );
1332        }
1333        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1334
1335        let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1336
1337        if keyed_cache_may_stale {
1338            self.inner.pending_watermark = None;
1339        }
1340
1341        Ok((
1342            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1343            keyed_cache_may_stale,
1344        ))
1345    }
1346}
1347
1348// write
1349impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1350    fn handle_mem_table_error(&self, e: StorageError) {
1351        let e = match e.into_inner() {
1352            ErrorKind::MemTable(e) => e,
1353            _ => unreachable!("should only get memtable error"),
1354        };
1355        match *e {
1356            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1357                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1358                panic!(
1359                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1360                    self.table_id,
1361                    vnode,
1362                    &key,
1363                    prev.debug_fmt(&*self.row_serde),
1364                    new.debug_fmt(&*self.row_serde),
1365                )
1366            }
1367        }
1368    }
1369
1370    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1371        insane_mode_discard_point!();
1372        let value_bytes = self.row_serde.serialize(&value).into();
1373
1374        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1375
1376        // Update vnode statistics (skip if all_rows is present)
1377        if self.all_rows.is_none()
1378            && let Some(stats) = &mut self.vnode_stats
1379            && let Some(vnode_stat) = stats.get_mut(&vnode)
1380        {
1381            vnode_stat.update_with_key(&key_without_vnode);
1382        }
1383
1384        if let Some(rows) = &mut self.all_rows {
1385            rows.get_mut(&vnode)
1386                .expect("covered vnode")
1387                .insert(key_without_vnode, value.into_owned_row());
1388        }
1389        self.state_store
1390            .insert(key, value_bytes, None)
1391            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1392    }
1393
1394    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1395        insane_mode_discard_point!();
1396        let value_bytes = self.row_serde.serialize(value).into();
1397
1398        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1399
1400        if self.all_rows.is_none()
1401            && let Some(stats) = &mut self.vnode_stats
1402            && let Some(vnode_stat) = stats.get_mut(&vnode)
1403        {
1404            vnode_stat.update_with_key(&key_without_vnode);
1405        }
1406
1407        if let Some(rows) = &mut self.all_rows {
1408            rows.get_mut(&vnode)
1409                .expect("covered vnode")
1410                .remove(&key_without_vnode);
1411        }
1412        self.state_store
1413            .delete(key, value_bytes)
1414            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1415    }
1416
1417    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1418        insane_mode_discard_point!();
1419        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1420        let old_value_bytes = self.row_serde.serialize(old_value).into();
1421
1422        let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1423
1424        // Update does not change the key, so statistics remain valid (skip if all_rows is present)
1425        // But we update to ensure consistency
1426        if self.all_rows.is_none()
1427            && let Some(stats) = &mut self.vnode_stats
1428            && let Some(vnode_stat) = stats.get_mut(&vnode)
1429        {
1430            vnode_stat.update_with_key(&key_without_vnode);
1431        }
1432
1433        if let Some(rows) = &mut self.all_rows {
1434            rows.get_mut(&vnode)
1435                .expect("covered vnode")
1436                .insert(key_without_vnode, new_value.into_owned_row());
1437        }
1438        self.state_store
1439            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1440            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1441    }
1442}
1443
1444impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1445where
1446    S: StateStore,
1447    SD: ValueRowSerde,
1448{
1449    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1450    /// the table.
1451    pub fn insert(&mut self, value: impl Row) {
1452        let pk_indices = &self.pk_indices;
1453        let pk = (&value).project(pk_indices);
1454
1455        let key_bytes = self.serialize_pk(&pk);
1456        dispatch_value_indices!(&self.value_indices, [value], {
1457            self.row_store.insert(key_bytes, value)
1458        })
1459    }
1460
1461    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1462    /// column desc of the table.
1463    pub fn delete(&mut self, old_value: impl Row) {
1464        let pk_indices = &self.pk_indices;
1465        let pk = (&old_value).project(pk_indices);
1466
1467        let key_bytes = self.serialize_pk(&pk);
1468        dispatch_value_indices!(&self.value_indices, [old_value], {
1469            self.row_store.delete(key_bytes, old_value)
1470        })
1471    }
1472
1473    /// Update a row. The old and new value should have the same pk.
1474    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1475        let old_pk = (&old_value).project(self.pk_indices());
1476        let new_pk = (&new_value).project(self.pk_indices());
1477        debug_assert!(
1478            Row::eq(&old_pk, new_pk),
1479            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1480            self.table_id
1481        );
1482
1483        let key_bytes = self.serialize_pk(&new_pk);
1484        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1485            self.row_store.update(key_bytes, old_value, new_value)
1486        })
1487    }
1488
1489    /// Write a record into state table. Must have the same schema with the table.
1490    pub fn write_record(&mut self, record: Record<impl Row>) {
1491        match record {
1492            Record::Insert { new_row } => self.insert(new_row),
1493            Record::Delete { old_row } => self.delete(old_row),
1494            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1495        }
1496    }
1497
1498    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1499        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1500    }
1501
1502    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1503    // allow(izip, which use zip instead of zip_eq)
1504    #[allow(clippy::disallowed_methods)]
1505    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1506        let chunk = if IS_REPLICATED {
1507            self.fill_non_output_indices(chunk)
1508        } else {
1509            chunk
1510        };
1511
1512        let vnodes = self
1513            .distribution
1514            .compute_chunk_vnode(&chunk, &self.pk_indices);
1515
1516        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1517            let Some((op, row)) = optional_row else {
1518                continue;
1519            };
1520            let pk = row.project(&self.pk_indices);
1521            let vnode = vnodes[idx];
1522            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1523            match op {
1524                Op::Insert | Op::UpdateInsert => {
1525                    dispatch_value_indices!(&self.value_indices, [row], {
1526                        self.row_store.insert(key_bytes, row);
1527                    });
1528                }
1529                Op::Delete | Op::UpdateDelete => {
1530                    dispatch_value_indices!(&self.value_indices, [row], {
1531                        self.row_store.delete(key_bytes, row);
1532                    });
1533                }
1534            }
1535        }
1536    }
1537
1538    /// Update watermark for state cleaning.
1539    ///
1540    /// # Arguments
1541    ///
1542    /// * `watermark` - Latest watermark received.
1543    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1544        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1545        self.pending_watermark = Some(watermark);
1546    }
1547
1548    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1549    /// table through `update_watermark` method.
1550    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1551        self.committed_watermark.as_ref()
1552    }
1553
1554    pub async fn commit(
1555        &mut self,
1556        new_epoch: EpochPair,
1557    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1558        self.commit_inner(new_epoch, None).await
1559    }
1560
1561    #[cfg(test)]
1562    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1563        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1564    }
1565
1566    pub async fn commit_assert_no_update_vnode_bitmap(
1567        &mut self,
1568        new_epoch: EpochPair,
1569    ) -> StreamExecutorResult<()> {
1570        let post_commit = self.commit_inner(new_epoch, None).await?;
1571        post_commit.post_yield_barrier(None).await?;
1572        Ok(())
1573    }
1574
1575    pub async fn commit_may_switch_consistent_op(
1576        &mut self,
1577        new_epoch: EpochPair,
1578        op_consistency_level: StateTableOpConsistencyLevel,
1579    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1580        if self.op_consistency_level != op_consistency_level {
1581            // avoid flooding e2e-test log
1582            if !cfg!(debug_assertions) {
1583                info!(
1584                    ?new_epoch,
1585                    prev_op_consistency_level = ?self.op_consistency_level,
1586                    ?op_consistency_level,
1587                    table_id = %self.table_id,
1588                    "switch to new op consistency level"
1589                );
1590            }
1591            self.commit_inner(new_epoch, Some(op_consistency_level))
1592                .await
1593        } else {
1594            self.commit_inner(new_epoch, None).await
1595        }
1596    }
1597
1598    async fn commit_inner(
1599        &mut self,
1600        new_epoch: EpochPair,
1601        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1602    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1603        assert!(!self.on_post_commit);
1604        assert_eq!(
1605            self.epoch.expect("should only be called after init").curr,
1606            new_epoch.prev
1607        );
1608        if let Some(new_consistency_level) = switch_consistent_op {
1609            assert_ne!(self.op_consistency_level, new_consistency_level);
1610            self.op_consistency_level = new_consistency_level;
1611        }
1612        trace!(
1613            table_id = %self.table_id,
1614            epoch = ?self.epoch,
1615            "commit state table"
1616        );
1617
1618        let table_watermarks = self.commit_pending_watermark();
1619        self.row_store
1620            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1621            .await?;
1622        self.epoch = Some(new_epoch);
1623
1624        self.on_post_commit = true;
1625        Ok(StateTablePostCommit { inner: self })
1626    }
1627
1628    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1629    fn commit_pending_watermark(
1630        &mut self,
1631    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1632        let watermark = self.pending_watermark.take()?;
1633        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1634
1635        assert!(
1636            !self.pk_indices().is_empty(),
1637            "see pending watermark on empty pk"
1638        );
1639        let (watermark_serializer, watermark_type) = self
1640            .watermark_serde
1641            .as_ref()
1642            .expect("watermark serde should be initialized to commit watermark");
1643        let watermark_suffix =
1644            serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1645        let vnode_watermark = VnodeWatermark::new(
1646            self.vnodes().clone(),
1647            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1648        );
1649        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1650
1651        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1652        let direction = if order_type.is_ascending() {
1653            WatermarkDirection::Ascending
1654        } else {
1655            WatermarkDirection::Descending
1656        };
1657
1658        self.committed_watermark = Some(watermark);
1659        Some((direction, vec![vnode_watermark], *watermark_type))
1660    }
1661
1662    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1663        self.row_store.try_flush().await?;
1664        Ok(())
1665    }
1666}
1667
1668// Manually expand trait alias for better IDE experience.
1669pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1670impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1671
1672pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1673impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1674
1675pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1676impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1677
1678pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1679
1680pub trait FromVnodeBytes {
1681    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1682}
1683
1684impl FromVnodeBytes for Bytes {
1685    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1686        prefix_slice_with_vnode(vnode, bytes)
1687    }
1688}
1689
1690impl FromVnodeBytes for () {
1691    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1692}
1693
1694// Iterator functions
1695impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1696where
1697    S: StateStore,
1698    SD: ValueRowSerde,
1699{
1700    /// This function scans rows from the relational table with specific `pk_range` under the same
1701    /// `vnode`.
1702    pub async fn iter_with_vnode(
1703        &self,
1704
1705        // Optional vnode that returns an iterator only over the given range under that vnode.
1706        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1707        // iterate over each vnode that the `StateTableInner` owns.
1708        vnode: VirtualNode,
1709        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1710        prefetch_options: PrefetchOptions,
1711    ) -> StreamExecutorResult<impl RowStream<'_>> {
1712        Ok(self
1713            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1714            .await?
1715            .map_ok(|(_, row)| row))
1716    }
1717
1718    pub async fn iter_keyed_row_with_vnode(
1719        &self,
1720        vnode: VirtualNode,
1721        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1722        prefetch_options: PrefetchOptions,
1723    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1724        Ok(self
1725            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1726            .await?
1727            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1728    }
1729
1730    pub async fn iter_with_vnode_and_output_indices(
1731        &self,
1732        vnode: VirtualNode,
1733        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1734        prefetch_options: PrefetchOptions,
1735    ) -> StreamExecutorResult<impl RowStream<'_>> {
1736        assert!(IS_REPLICATED);
1737        let stream = self
1738            .iter_with_vnode(vnode, pk_range, prefetch_options)
1739            .await?;
1740        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1741    }
1742}
1743
1744impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1745    // The lowest-level API.
1746    /// Middle-level APIs:
1747    /// - [`StateTableInner::iter_with_prefix_inner`]
1748    /// - [`StateTableInner::iter_kv_with_pk_range`]
1749    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1750        &self,
1751        vnode: VirtualNode,
1752        (start, end): (Bound<Bytes>, Bound<Bytes>),
1753        prefix_hint: Option<Bytes>,
1754        prefetch_options: PrefetchOptions,
1755    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1756        if let Some(m) = &self.metrics {
1757            m.iter_count.inc();
1758        }
1759        // Check if we can prune the entire range using vnode statistics
1760        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1761            &self.vnode_stats
1762            && let Some(vnode_stat) = stats.get(&vnode)
1763        {
1764            match vnode_stat.pruned_key_range(&start, &end) {
1765                Some((new_start, new_end)) => {
1766                    if self.enable_state_table_vnode_stats_pruning {
1767                        (new_start, new_end, false)
1768                    } else {
1769                        // In dry-run mode, we don't apply pruning but verify correctness
1770                        (start, end, false)
1771                    }
1772                }
1773                None => {
1774                    if let Some(m) = &self.metrics {
1775                        m.iter_vnode_pruned_count.inc();
1776                    }
1777                    // Mark that we should prune entirely, but handle dry-run below
1778                    (start.clone(), end.clone(), true)
1779                }
1780            }
1781        } else {
1782            (start, end, false)
1783        };
1784
1785        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1786            return Ok(futures::future::Either::Left(futures::stream::empty()));
1787        }
1788
1789        let table_id = self.table_id;
1790        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1791            // Only log when in dry-run mode and we would have pruned but got results
1792            if should_prune_entirely && result.is_ok() {
1793                tracing::warn!(
1794                    table_id = %table_id,
1795                    "vnode stats pruning dry run fails for iter. This will not affect correctness."
1796                );
1797            }
1798        };
1799
1800        if let Some(rows) = &self.all_rows {
1801            return Ok(futures::future::Either::Right(
1802                futures::future::Either::Left(
1803                    futures::stream::iter(
1804                        rows.get(&vnode)
1805                            .expect("covered vnode")
1806                            .range((pruned_start, pruned_end))
1807                            .map(move |(key, value)| {
1808                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1809                            }),
1810                    )
1811                    .inspect(inspect_fn),
1812                ),
1813            ));
1814        }
1815        let read_options = ReadOptions {
1816            prefix_hint,
1817            prefetch_options,
1818            cache_policy: CachePolicy::Fill(Hint::Normal),
1819        };
1820
1821        Ok(futures::future::Either::Right(
1822            futures::future::Either::Right(
1823                deserialize_keyed_row_stream(
1824                    self.state_store
1825                        .iter(
1826                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1827                            read_options,
1828                        )
1829                        .await?,
1830                    &*self.row_serde,
1831                )
1832                .inspect(inspect_fn),
1833            ),
1834        ))
1835    }
1836
1837    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1838        &self,
1839        vnode: VirtualNode,
1840        (start, end): (Bound<Bytes>, Bound<Bytes>),
1841        prefix_hint: Option<Bytes>,
1842        prefetch_options: PrefetchOptions,
1843    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1844        if let Some(m) = &self.metrics {
1845            m.iter_count.inc();
1846        }
1847        // Check if we can prune the entire range using vnode statistics
1848        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1849            &self.vnode_stats
1850            && let Some(vnode_stat) = stats.get(&vnode)
1851        {
1852            match vnode_stat.pruned_key_range(&start, &end) {
1853                Some((new_start, new_end)) => {
1854                    if self.enable_state_table_vnode_stats_pruning {
1855                        (new_start, new_end, false)
1856                    } else {
1857                        // In dry-run mode, we don't apply pruning but verify correctness
1858                        (start, end, false)
1859                    }
1860                }
1861                None => {
1862                    if let Some(m) = &self.metrics {
1863                        m.iter_vnode_pruned_count.inc();
1864                    }
1865                    // Mark that we should prune entirely, but handle dry-run below
1866                    (start, end, true)
1867                }
1868            }
1869        } else {
1870            (start, end, false)
1871        };
1872
1873        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1874            return Ok(futures::future::Either::Left(futures::stream::empty()));
1875        }
1876
1877        let table_id = self.table_id;
1878        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1879            // Only log when in dry-run mode and we would have pruned but got results
1880            if should_prune_entirely && result.is_ok() {
1881                tracing::warn!(
1882                    table_id = %table_id,
1883                    "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
1884                );
1885            }
1886        };
1887
1888        if let Some(rows) = &self.all_rows {
1889            return Ok(futures::future::Either::Right(
1890                futures::future::Either::Left(
1891                    futures::stream::iter(
1892                        rows.get(&vnode)
1893                            .expect("covered vnode")
1894                            .range((pruned_start, pruned_end))
1895                            .rev()
1896                            .map(move |(key, value)| {
1897                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1898                            }),
1899                    )
1900                    .inspect(inspect_fn),
1901                ),
1902            ));
1903        }
1904        let read_options = ReadOptions {
1905            prefix_hint,
1906            prefetch_options,
1907            cache_policy: CachePolicy::Fill(Hint::Normal),
1908        };
1909
1910        Ok(futures::future::Either::Right(
1911            futures::future::Either::Right(
1912                deserialize_keyed_row_stream(
1913                    self.state_store
1914                        .rev_iter(
1915                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1916                            read_options,
1917                        )
1918                        .await?,
1919                    &*self.row_serde,
1920                )
1921                .inspect(inspect_fn),
1922            ),
1923        ))
1924    }
1925}
1926
1927impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1928where
1929    S: StateStore,
1930    SD: ValueRowSerde,
1931{
1932    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1933    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1934    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1935    pub async fn iter_with_prefix(
1936        &self,
1937        pk_prefix: impl Row,
1938        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1939        prefetch_options: PrefetchOptions,
1940    ) -> StreamExecutorResult<impl RowStream<'_>> {
1941        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1942            .await?;
1943        Ok(stream.map_ok(|(_, row)| row))
1944    }
1945
1946    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1947    /// `vnode`, and filters out rows based on watermarks. It calls `iter_with_prefix` and further filters rows
1948    /// based on the table watermark retrieved from the state store.
1949    ///
1950    /// The caller must ensure that `clean_watermark_index` is set before calling this method, otherwise it will return all rows without filtering.
1951    pub async fn iter_with_prefix_respecting_watermark(
1952        &self,
1953        pk_prefix: impl Row,
1954        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1955        prefetch_options: PrefetchOptions,
1956    ) -> StreamExecutorResult<BoxedRowStream<'_>> {
1957        let vnode = self.compute_prefix_vnode(&pk_prefix);
1958        let stream = self
1959            .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
1960            .await?;
1961        let Some(clean_watermark_index) = self.clean_watermark_index else {
1962            return Ok(stream.boxed());
1963        };
1964        let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
1965            return Err(StreamExecutorError::from(anyhow!(
1966                "Missing watermark serde"
1967            )));
1968        };
1969        // Fast path. TableWatermarksIndex::rewrite_range_with_table_watermark has already filtered the rows.
1970        if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
1971            return Ok(stream.boxed());
1972        }
1973        let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
1974        let Some(watermark_bytes) = watermark_bytes else {
1975            return Ok(stream.boxed());
1976        };
1977        let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
1978        if watermark_row.len() != 1 {
1979            return Err(StreamExecutorError::from(format!(
1980                "Watermark row should have exactly 1 column, got {}",
1981                watermark_row.len()
1982            )));
1983        }
1984        let watermark_value = watermark_row[0].clone();
1985        // StateTableInner::update_watermark should ensure that the watermark is not NULL
1986        if watermark_value.is_none() {
1987            return Err(StreamExecutorError::from(anyhow!(
1988                "Watermark cannot be NULL"
1989            )));
1990        }
1991        let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
1992            StreamExecutorError::from(anyhow!(
1993                "Watermark serde should have at least one order type"
1994            ))
1995        })?;
1996        let direction = if order_type.is_ascending() {
1997            WatermarkDirection::Ascending
1998        } else {
1999            WatermarkDirection::Descending
2000        };
2001        let stream = stream.try_filter_map(move |row| {
2002            let watermark_col_value = row.datum_at(clean_watermark_index);
2003            let should_filter = direction.datum_filter_by_watermark(
2004                watermark_col_value,
2005                &watermark_value,
2006                *order_type,
2007            );
2008            if should_filter {
2009                ready(Ok(None))
2010            } else {
2011                ready(Ok(Some(row)))
2012            }
2013        });
2014        Ok(stream.boxed())
2015    }
2016
2017    /// Get the row from a state table with only 1 row.
2018    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2019        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2020        let stream = self
2021            .iter_with_prefix(row::empty(), sub_range, Default::default())
2022            .await?;
2023        pin_mut!(stream);
2024
2025        if let Some(res) = stream.next().await {
2026            let value = res?.into_owned_row();
2027            assert!(stream.next().await.is_none());
2028            Ok(Some(value))
2029        } else {
2030            Ok(None)
2031        }
2032    }
2033
2034    /// Get the row from a state table with only 1 row, and the row has only 1 col.
2035    ///
2036    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
2037    /// which does not matter in the use case.
2038    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2039        Ok(self
2040            .get_from_one_row_table()
2041            .await?
2042            .and_then(|row| row[0].clone()))
2043    }
2044
2045    pub async fn iter_keyed_row_with_prefix(
2046        &self,
2047        pk_prefix: impl Row,
2048        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2049        prefetch_options: PrefetchOptions,
2050    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2051        Ok(
2052            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
2053                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2054        )
2055    }
2056
2057    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
2058    pub async fn rev_iter_with_prefix(
2059        &self,
2060        pk_prefix: impl Row,
2061        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2062        prefetch_options: PrefetchOptions,
2063    ) -> StreamExecutorResult<impl RowStream<'_>> {
2064        Ok(
2065            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
2066                .await?.map_ok(|(_, row)| row),
2067        )
2068    }
2069
2070    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2071        &self,
2072        pk_prefix: impl Row,
2073        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2074        prefetch_options: PrefetchOptions,
2075    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2076        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2077        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2078
2079        // We assume that all usages of iterating the state table only access a single vnode.
2080        // If this assertion fails, then something must be wrong with the operator implementation or
2081        // the distribution derivation from the optimizer.
2082        let vnode = self.compute_prefix_vnode(&pk_prefix);
2083
2084        // Construct prefix hint for prefix bloom filter.
2085        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2086        if self.prefix_hint_len != 0 {
2087            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2088        }
2089        let prefix_hint = {
2090            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
2091                None
2092            } else {
2093                let encoded_prefix_len = self
2094                    .pk_serde
2095                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2096
2097                Some(Bytes::copy_from_slice(
2098                    &encoded_prefix[..encoded_prefix_len],
2099                ))
2100            }
2101        };
2102
2103        trace!(
2104            table_id = %self.table_id(),
2105            ?prefix_hint, ?pk_prefix,
2106            ?pk_prefix_indices,
2107            iter_direction = if REVERSE { "reverse" } else { "forward" },
2108            "storage_iter_with_prefix"
2109        );
2110
2111        let memcomparable_range =
2112            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2113
2114        Ok(if REVERSE {
2115            futures::future::Either::Left(
2116                self.row_store
2117                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2118                    .await?,
2119            )
2120        } else {
2121            futures::future::Either::Right(
2122                self.row_store
2123                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2124                    .await?,
2125            )
2126        })
2127    }
2128
2129    /// This function scans raw key-values from the relational table with specific `pk_range` under
2130    /// the same `vnode`.
2131    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2132        &'a self,
2133        pk_range: &(Bound<impl Row>, Bound<impl Row>),
2134        // Optional vnode that returns an iterator only over the given range under that vnode.
2135        // For now, we require this parameter, and will panic. In the future, when `None`, we can
2136        // iterate over each vnode that the `StateTableInner` owns.
2137        vnode: VirtualNode,
2138        prefetch_options: PrefetchOptions,
2139    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2140        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2141
2142        // TODO: provide a trace of useful params.
2143        self.row_store
2144            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2145            .await
2146    }
2147}
2148
2149fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2150    iter: impl StateStoreIter + 'a,
2151    deserializer: &'a impl ValueRowSerde,
2152) -> impl PkRowStream<'a, K> {
2153    iter.into_stream(move |(key, value)| {
2154        Ok((
2155            K::copy_from_slice(key.user_key.table_key.as_ref()),
2156            deserializer.deserialize(value).map(OwnedRow::new)?,
2157        ))
2158    })
2159    .map_err(Into::into)
2160}
2161
2162pub fn prefix_range_to_memcomparable(
2163    pk_serde: &OrderedRowSerde,
2164    range: &(Bound<impl Row>, Bound<impl Row>),
2165) -> (Bound<Bytes>, Bound<Bytes>) {
2166    (
2167        start_range_to_memcomparable(pk_serde, &range.0),
2168        end_range_to_memcomparable(pk_serde, &range.1, None),
2169    )
2170}
2171
2172fn prefix_and_sub_range_to_memcomparable(
2173    pk_serde: &OrderedRowSerde,
2174    sub_range: &(Bound<impl Row>, Bound<impl Row>),
2175    pk_prefix: impl Row,
2176) -> (Bound<Bytes>, Bound<Bytes>) {
2177    let (range_start, range_end) = sub_range;
2178    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2179    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2180    let start_range = match range_start {
2181        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2182        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2183        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2184    };
2185    let end_range = match range_end {
2186        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2187        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2188        Unbounded => Unbounded,
2189    };
2190    (
2191        start_range_to_memcomparable(pk_serde, &start_range),
2192        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2193    )
2194}
2195
2196fn start_range_to_memcomparable<R: Row>(
2197    pk_serde: &OrderedRowSerde,
2198    bound: &Bound<R>,
2199) -> Bound<Bytes> {
2200    let serialize_pk_prefix = |pk_prefix: &R| {
2201        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2202        serialize_pk(pk_prefix, &prefix_serializer)
2203    };
2204    match bound {
2205        Unbounded => Unbounded,
2206        Included(r) => {
2207            let serialized = serialize_pk_prefix(r);
2208
2209            Included(serialized)
2210        }
2211        Excluded(r) => {
2212            let serialized = serialize_pk_prefix(r);
2213
2214            start_bound_of_excluded_prefix(&serialized)
2215        }
2216    }
2217}
2218
2219fn end_range_to_memcomparable<R: Row>(
2220    pk_serde: &OrderedRowSerde,
2221    bound: &Bound<R>,
2222    serialized_pk_prefix: Option<Bytes>,
2223) -> Bound<Bytes> {
2224    let serialize_pk_prefix = |pk_prefix: &R| {
2225        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2226        serialize_pk(pk_prefix, &prefix_serializer)
2227    };
2228    match bound {
2229        Unbounded => match serialized_pk_prefix {
2230            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2231            None => Unbounded,
2232        },
2233        Included(r) => {
2234            let serialized = serialize_pk_prefix(r);
2235
2236            end_bound_of_prefix(&serialized)
2237        }
2238        Excluded(r) => {
2239            let serialized = serialize_pk_prefix(r);
2240            Excluded(serialized)
2241        }
2242    }
2243}
2244
2245fn fill_non_output_indices(
2246    i2o_mapping: &ColIndexMapping,
2247    data_types: &[DataType],
2248    chunk: StreamChunk,
2249) -> StreamChunk {
2250    let cardinality = chunk.cardinality();
2251    let (ops, columns, vis) = chunk.into_inner();
2252    let mut full_columns = Vec::with_capacity(data_types.len());
2253    for (i, data_type) in data_types.iter().enumerate() {
2254        if let Some(j) = i2o_mapping.try_map(i) {
2255            full_columns.push(columns[j].clone());
2256        } else {
2257            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2258            column_builder.append_n_null(cardinality);
2259            let column: ArrayRef = column_builder.finish().into();
2260            full_columns.push(column)
2261        }
2262    }
2263    let data_chunk = DataChunk::new(full_columns, vis);
2264    StreamChunk::from_parts(ops, data_chunk)
2265}
2266
2267#[cfg(test)]
2268mod tests {
2269    use std::fmt::Debug;
2270
2271    use expect_test::{Expect, expect};
2272
2273    use super::*;
2274
2275    fn check(actual: impl Debug, expect: Expect) {
2276        let actual = format!("{:#?}", actual);
2277        expect.assert_eq(&actual);
2278    }
2279
2280    #[test]
2281    fn test_fill_non_output_indices() {
2282        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2283        let replicated_chunk = [OwnedRow::new(vec![
2284            Some(222_i32.into()),
2285            Some(2_i32.into()),
2286        ])];
2287        let replicated_chunk = StreamChunk::from_parts(
2288            vec![Op::Insert],
2289            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2290        );
2291        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2292        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2293        check(
2294            filled_chunk,
2295            expect![[r#"
2296            StreamChunk { cardinality: 1, capacity: 1, data:
2297            +---+---+---+-----+
2298            | + | 2 |   | 222 |
2299            +---+---+---+-----+
2300             }"#]],
2301        );
2302    }
2303}