risingwave_stream/common/table/
state_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::id::FragmentId;
39use risingwave_common::row::{self, OwnedRow, Row, RowExt};
40use risingwave_common::types::{DataType, ScalarImpl};
41use risingwave_common::util::column_index_mapping::ColIndexMapping;
42use risingwave_common::util::epoch::EpochPair;
43use risingwave_common::util::row_serde::OrderedRowSerde;
44use risingwave_common::util::sort_util::{OrderType, cmp_datum};
45use risingwave_common::util::value_encoding::BasicSerde;
46use risingwave_hummock_sdk::HummockReadEpoch;
47use risingwave_hummock_sdk::key::{
48    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
49    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
50};
51use risingwave_hummock_sdk::table_watermark::{
52    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
53};
54use risingwave_pb::catalog::Table;
55use risingwave_pb::plan_common::StorageTableDesc;
56use risingwave_storage::StateStore;
57use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
58use risingwave_storage::hummock::CachePolicy;
59use risingwave_storage::mem_table::MemTableError;
60use risingwave_storage::row_serde::find_columns_by_ids;
61use risingwave_storage::row_serde::row_serde_util::{
62    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
63};
64use risingwave_storage::row_serde::value_serde::ValueRowSerde;
65use risingwave_storage::store::*;
66use risingwave_storage::table::{KeyedRow, TableDistribution, should_calculate_prefix_hint};
67use thiserror_ext::AsReport;
68use tracing::{Instrument, trace};
69
70use crate::cache::keyed_cache_may_stale;
71use crate::executor::monitor::streaming_stats::StateTableMetrics;
72use crate::executor::{StreamExecutorError, StreamExecutorResult};
73
74/// This macro is used to mark a point where we want to randomly discard the operation and early
75/// return, only in insane mode.
76macro_rules! insane_mode_discard_point {
77    () => {{
78        use rand::Rng;
79        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
80            return;
81        }
82    }};
83}
84
85/// Per-vnode statistics for pruning. None means this stat is not maintained.
86/// For each vnode, we maintain the min and max storage table key (excluding the vnode part) observed in the vnode.
87/// The stat won't differentiate between tombstone and normal keys.
88struct VnodeStatistics {
89    min_key: Option<Bytes>,
90    max_key: Option<Bytes>,
91}
92
93impl VnodeStatistics {
94    fn new() -> Self {
95        Self {
96            min_key: None,
97            max_key: None,
98        }
99    }
100
101    fn update_with_key(&mut self, key: &Bytes) {
102        if let Some(min) = &self.min_key {
103            if key < min {
104                self.min_key = Some(key.clone());
105            }
106        } else {
107            self.min_key = Some(key.clone());
108        }
109
110        if let Some(max) = &self.max_key {
111            if key > max {
112                self.max_key = Some(key.clone());
113            }
114        } else {
115            self.max_key = Some(key.clone());
116        }
117    }
118
119    fn can_prune(&self, key: &Bytes) -> bool {
120        if let Some(min) = &self.min_key
121            && key < min
122        {
123            return true;
124        }
125        if let Some(max) = &self.max_key
126            && key > max
127        {
128            return true;
129        }
130        false
131    }
132
133    fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
134        // Check if the range is completely outside vnode bounds
135        if let Some(max) = &self.max_key {
136            match start {
137                Included(s) if s > max => return true,
138                Excluded(s) if s >= max => return true,
139                _ => {}
140            }
141        }
142        if let Some(min) = &self.min_key {
143            match end {
144                Included(e) if e < min => return true,
145                Excluded(e) if e <= min => return true,
146                _ => {}
147            }
148        }
149        false
150    }
151
152    fn pruned_key_range(
153        &self,
154        start: &Bound<Bytes>,
155        end: &Bound<Bytes>,
156    ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
157        if self.can_prune_range(start, end) {
158            return None;
159        }
160        let new_start = if let Some(min) = &self.min_key {
161            match start {
162                Included(s) if s <= min => Included(min.clone()),
163                Excluded(s) if s < min => Included(min.clone()),
164                _ => start.clone(),
165            }
166        } else {
167            start.clone()
168        };
169
170        let new_end = if let Some(max) = &self.max_key {
171            match end {
172                Included(e) if e >= max => Included(max.clone()),
173                Excluded(e) if e > max => Included(max.clone()),
174                _ => end.clone(),
175            }
176        } else {
177            end.clone()
178        };
179
180        Some((new_start, new_end))
181    }
182}
183
184/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
185/// row-based encoding.
186pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
187where
188    S: StateStore,
189    SD: ValueRowSerde,
190{
191    /// Id for this table.
192    table_id: TableId,
193
194    /// State store backend.
195    row_store: StateTableRowStore<S::Local, SD>,
196
197    /// State store for accessing snapshot data
198    store: S,
199
200    /// Current epoch
201    epoch: Option<EpochPair>,
202
203    /// Used for serializing and deserializing the primary key.
204    pk_serde: OrderedRowSerde,
205
206    /// Indices of primary key.
207    /// Note that the index is based on the all columns of the table, instead of the output ones.
208    // FIXME: revisit constructions and usages.
209    pk_indices: Vec<usize>,
210
211    /// Distribution of the state table.
212    ///
213    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
214    /// executor. The table will also check whether the written rows
215    /// conform to this partition.
216    distribution: TableDistribution,
217
218    prefix_hint_len: usize,
219
220    value_indices: Option<Vec<usize>>,
221
222    /// The index of the watermark column used for state cleaning in all columns.
223    pub clean_watermark_index: Option<usize>,
224    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
225    pending_watermark: Option<ScalarImpl>,
226    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
227    committed_watermark: Option<ScalarImpl>,
228    /// Serializer and serde type for the watermark column.
229    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
230
231    /// Data Types
232    /// We will need to use to build data chunks from state table rows.
233    data_types: Vec<DataType>,
234
235    /// "i" here refers to the base `state_table`'s actual schema.
236    /// "o" here refers to the replicated state table's output schema.
237    /// This mapping is used to reconstruct a row being written from replicated state table.
238    /// Such that the schema of this row will match the full schema of the base state table.
239    /// It is only applicable for replication.
240    i2o_mapping: ColIndexMapping,
241
242    /// Output indices
243    /// Used for:
244    /// 1. Computing `output_value_indices` to ser/de replicated rows.
245    /// 2. Computing output pk indices to used them for backfill state.
246    pub output_indices: Vec<usize>,
247
248    op_consistency_level: StateTableOpConsistencyLevel,
249
250    /// Flag to indicate whether the state table has called `commit`, but has not called
251    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
252    on_post_commit: bool,
253}
254
255/// `StateTable` will use `BasicSerde` as default
256pub type StateTable<S> = StateTableInner<S, BasicSerde>;
257/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
258/// Used for `ArrangementBackfill` executor.
259pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
260
261// initialize
262impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
263where
264    S: StateStore,
265    SD: ValueRowSerde,
266{
267    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
268    /// and otherwise, deadlock can be likely to happen.
269    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
270        self.row_store
271            .init(epoch, self.distribution.vnodes())
272            .await?;
273        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
274        Ok(())
275    }
276
277    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
278        self.store
279            .try_wait_epoch(
280                HummockReadEpoch::Committed(prev_epoch),
281                TryWaitEpochOptions {
282                    table_id: self.table_id,
283                },
284            )
285            .await
286    }
287
288    pub fn state_store(&self) -> &S {
289        &self.store
290    }
291}
292
293fn consistent_old_value_op(
294    row_serde: Arc<impl ValueRowSerde>,
295    is_log_store: bool,
296) -> OpConsistencyLevel {
297    OpConsistencyLevel::ConsistentOldValue {
298        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
299            if first == second {
300                return true;
301            }
302            let first = match row_serde.deserialize(first) {
303                Ok(rows) => rows,
304                Err(e) => {
305                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
306                    return false;
307                }
308            };
309            let second = match row_serde.deserialize(second) {
310                Ok(rows) => rows,
311                Err(e) => {
312                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
313                    return false;
314                }
315            };
316            if first != second {
317                error!(first = ?first, second = ?second, "sanity check fail");
318                false
319            } else {
320                true
321            }
322        }),
323        is_log_store,
324    }
325}
326
327macro_rules! dispatch_value_indices {
328    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
329        if let Some(value_indices) = $value_indices {
330            $(
331                let $row_var_name = $row_var_name.project(value_indices);
332            )+
333            $body
334        } else {
335            $body
336        }
337    };
338}
339
340/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
341/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
342/// provides method to read (`get` and `iter`) over serialized key (or key range)
343/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
344struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
345    state_store: LS,
346    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
347
348    table_id: TableId,
349    row_serde: Arc<SD>,
350    // should be only used for debugging in panic message of handle_mem_table_error
351    pk_serde: OrderedRowSerde,
352
353    // Per-vnode min/max key statistics for pruning
354    vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
355    /// When false, vnode stats pruning is in dry-run mode:
356    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
357    /// Reads still go to cache/storage even when pruning would indicate no results.
358    enable_state_table_vnode_stats_pruning: bool,
359    // Optional metrics for state table operations
360    pub metrics: Option<StateTableMetrics>,
361}
362
363impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
364    async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
365        if self.vnode_stats.is_none() {
366            return Ok(());
367        }
368
369        // vnode stats must be disabled when all rows are preloaded
370        assert!(self.all_rows.is_none());
371
372        let start_time = Instant::now();
373        let mut stats_map = HashMap::new();
374
375        // Scan each vnode to get min/max keys
376        for vnode in vnode_bitmap.iter_vnodes() {
377            let mut stats = VnodeStatistics::new();
378
379            // Get min key via forward iteration
380            let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
381            let read_options = ReadOptions {
382                cache_policy: CachePolicy::Fill(Hint::Low),
383                ..Default::default()
384            };
385
386            let mut iter = self
387                .state_store
388                .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
389                .await?;
390            if let Some(item) = iter.try_next().await? {
391                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
392                assert_eq!(vnode, key_vnode);
393                stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
394            }
395
396            // Get max key via reverse iteration
397            let mut rev_iter = self
398                .state_store
399                .rev_iter(memcomparable_range_with_vnode, read_options)
400                .await?;
401            if let Some(item) = rev_iter.try_next().await? {
402                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
403                assert_eq!(vnode, key_vnode);
404                stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
405            }
406
407            stats_map.insert(vnode, stats);
408        }
409
410        self.vnode_stats = Some(stats_map);
411
412        // avoid flooding e2e-test log
413        if !cfg!(debug_assertions) {
414            info!(
415                table_id = %self.table_id,
416                vnode_count = vnode_bitmap.count_ones(),
417                duration = ?start_time.elapsed(),
418                "finished initializing vnode statistics"
419            );
420        }
421
422        Ok(())
423    }
424
425    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
426        if let Some(rows) = &mut self.all_rows {
427            rows.clear();
428            let start_time = Instant::now();
429            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
430                let state_store = &self.state_store;
431                let row_serde = &self.row_serde;
432                async move {
433                    let mut rows = BTreeMap::new();
434                    let memcomparable_range_with_vnode =
435                        prefixed_range_with_vnode::<Bytes>(.., vnode);
436                    // TODO: set read options
437                    let stream = deserialize_keyed_row_stream::<Bytes>(
438                        state_store
439                            .iter(
440                                memcomparable_range_with_vnode,
441                                ReadOptions {
442                                    prefix_hint: None,
443                                    prefetch_options: Default::default(),
444                                    cache_policy: Default::default(),
445                                },
446                            )
447                            .await?,
448                        &**row_serde,
449                    );
450                    pin_mut!(stream);
451                    while let Some((encoded_key, row)) = stream.try_next().await? {
452                        let key = TableKey(encoded_key);
453                        let (iter_vnode, key) = key.split_vnode_bytes();
454                        assert_eq!(vnode, iter_vnode);
455                        rows.try_insert(key, row).expect("non-duplicated");
456                    }
457                    Ok((vnode, rows)) as StreamExecutorResult<_>
458                }
459            }))
460            .await?
461            .into_iter()
462            .collect();
463            // avoid flooding e2e-test log
464            if !cfg!(debug_assertions) {
465                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
466            }
467        }
468        Ok(())
469    }
470
471    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
472        self.state_store.init(InitOptions::new(epoch)).await?;
473        self.may_reload_all_rows(vnode_bitmap).await?;
474        self.may_load_vnode_stats(vnode_bitmap).await
475    }
476
477    async fn update_vnode_bitmap(
478        &mut self,
479        vnodes: Arc<Bitmap>,
480    ) -> StreamExecutorResult<Arc<Bitmap>> {
481        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
482        self.may_reload_all_rows(&vnodes).await?;
483        self.may_load_vnode_stats(&vnodes).await?;
484
485        Ok(prev_vnodes)
486    }
487
488    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
489        self.state_store.try_flush().await?;
490        Ok(())
491    }
492
493    async fn seal_current_epoch(
494        &mut self,
495        next_epoch: u64,
496        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
497        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
498    ) -> StreamExecutorResult<()> {
499        if let Some((direction, watermarks, serde_type)) = &table_watermarks
500            && let Some(rows) = &mut self.all_rows
501        {
502            match serde_type {
503                WatermarkSerdeType::PkPrefix => {
504                    for vnode_watermark in watermarks {
505                        match direction {
506                            WatermarkDirection::Ascending => {
507                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
508                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
509                                    // split_off returns everything after the given key, including the key.
510                                    *rows = rows.split_off(vnode_watermark.watermark());
511                                }
512                            }
513                            WatermarkDirection::Descending => {
514                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
515                                let split_off_key = next_key(vnode_watermark.watermark());
516                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
517                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
518                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
519                                    // (..Include(vnode_watermark.watermark()))
520                                    rows.split_off(split_off_key.as_slice());
521                                }
522                            }
523                        }
524                    }
525                }
526                WatermarkSerdeType::NonPkPrefix => {
527                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
528                    self.all_rows = None;
529                }
530                WatermarkSerdeType::Value => {
531                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
532                    self.all_rows = None;
533                }
534            }
535        }
536        self.state_store
537            .flush()
538            .instrument(tracing::info_span!("state_table_flush"))
539            .await?;
540        let switch_op_consistency_level =
541            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
542                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
543                StateTableOpConsistencyLevel::ConsistentOldValue => {
544                    consistent_old_value_op(self.row_serde.clone(), false)
545                }
546                StateTableOpConsistencyLevel::LogStoreEnabled => {
547                    consistent_old_value_op(self.row_serde.clone(), true)
548                }
549            });
550        self.state_store.seal_current_epoch(
551            next_epoch,
552            SealCurrentEpochOptions {
553                table_watermarks,
554                switch_op_consistency_level,
555            },
556        );
557        Ok(())
558    }
559}
560
561#[derive(Eq, PartialEq, Copy, Clone, Debug)]
562pub enum StateTableOpConsistencyLevel {
563    /// Op is inconsistent
564    Inconsistent,
565    /// Op is consistent.
566    /// - Insert op should ensure that the key does not exist previously
567    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
568    ConsistentOldValue,
569    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
570    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
571    LogStoreEnabled,
572}
573
574pub struct StateTableBuilder<S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
575    // Extracted intermediate fields (source-agnostic)
576    table_id: TableId,
577    table_name_for_debug: String,
578    table_columns: Vec<ColumnDesc>,
579    order_types: Vec<OrderType>,
580    pk_indices: Vec<usize>,
581    dist_key_in_pk_indices: Vec<usize>,
582    vnode_col_idx_in_pk: Option<usize>,
583    expected_vnode_count: usize,
584    value_indices: Vec<usize>,
585    prefix_hint_len: usize,
586    retention_seconds: Option<u32>,
587    versioned: bool,
588    fragment_id: FragmentId,
589    clean_watermark_index: Option<usize>,
590
591    // Builder configuration fields
592    store: S,
593    vnodes: Option<Arc<Bitmap>>,
594    op_consistency_level: Option<StateTableOpConsistencyLevel>,
595    output_column_ids: Option<Vec<ColumnId>>,
596    preload_all_rows: PreloadAllRow,
597    enable_vnode_key_stats: Option<bool>,
598    /// When false, vnode stats pruning is in dry-run mode:
599    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
600    enable_state_table_vnode_stats_pruning: bool,
601    metrics: Option<StateTableMetrics>,
602
603    _serde: PhantomData<SD>,
604}
605
606impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
607    StateTableBuilder<S, SD, IS_REPLICATED, ()>
608{
609    fn with_preload_all_rows(
610        self,
611        preload_all_rows: bool,
612    ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
613        StateTableBuilder {
614            table_id: self.table_id,
615            table_name_for_debug: self.table_name_for_debug,
616            table_columns: self.table_columns,
617            order_types: self.order_types,
618            pk_indices: self.pk_indices,
619            dist_key_in_pk_indices: self.dist_key_in_pk_indices,
620            vnode_col_idx_in_pk: self.vnode_col_idx_in_pk,
621            expected_vnode_count: self.expected_vnode_count,
622            value_indices: self.value_indices,
623            prefix_hint_len: self.prefix_hint_len,
624            retention_seconds: self.retention_seconds,
625            versioned: self.versioned,
626            fragment_id: self.fragment_id,
627            clean_watermark_index: self.clean_watermark_index,
628            store: self.store,
629            vnodes: self.vnodes,
630            op_consistency_level: self.op_consistency_level,
631            output_column_ids: self.output_column_ids,
632            preload_all_rows,
633            enable_vnode_key_stats: self.enable_vnode_key_stats,
634            enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
635            metrics: self.metrics,
636            _serde: Default::default(),
637        }
638    }
639
640    pub fn enable_preload_all_rows_by_config(
641        self,
642        config: &StreamingConfig,
643    ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
644        let developer = &config.developer;
645        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
646            !developer
647                .mem_preload_state_table_ids_blacklist
648                .contains(&self.table_id.as_raw_id())
649        } else {
650            developer
651                .mem_preload_state_table_ids_whitelist
652                .contains(&self.table_id.as_raw_id())
653        };
654        self.with_preload_all_rows(preload_all_rows)
655    }
656
657    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
658        self.with_preload_all_rows(false)
659    }
660}
661
662impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
663    StateTableBuilder<S, SD, IS_REPLICATED, PreloadAllRow>
664{
665    pub fn with_op_consistency_level(
666        mut self,
667        op_consistency_level: StateTableOpConsistencyLevel,
668    ) -> Self {
669        self.op_consistency_level = Some(op_consistency_level);
670        self
671    }
672
673    pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
674        self.enable_vnode_key_stats = Some(enable);
675        self.enable_state_table_vnode_stats_pruning =
676            enable && config.developer.enable_state_table_vnode_stats_pruning;
677        self
678    }
679
680    pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
681        self.metrics = Some(metrics);
682        self
683    }
684}
685
686impl<S: StateStore, SD: ValueRowSerde, PreloadAllRow>
687    StateTableBuilder<S, SD, true, PreloadAllRow>
688{
689    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
690        self.output_column_ids = Some(output_column_ids);
691        self
692    }
693}
694
695impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
696    StateTableBuilder<S, SD, IS_REPLICATED, bool>
697{
698    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
699        let mut preload_all_rows = self.preload_all_rows;
700        if preload_all_rows
701            && let Err(e) =
702                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
703        {
704            warn!(table_id=%self.table_id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
705            preload_all_rows = false;
706        }
707
708        let should_enable_vnode_key_stats = if preload_all_rows
709            && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
710            && enable_vnode_key_stats
711        {
712            false
713        } else {
714            self.enable_vnode_key_stats.unwrap_or(false)
715        };
716        self.build_inner(preload_all_rows, should_enable_vnode_key_stats)
717            .await
718    }
719}
720
721// initialize
722// FIXME(kwannoel): Enforce that none of the constructors here
723// should be used by replicated state table.
724// Apart from from_table_catalog_inner.
725impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
726where
727    S: StateStore,
728    SD: ValueRowSerde,
729{
730    /// Create state table from table catalog and store.
731    ///
732    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
733    #[cfg(any(test, feature = "test"))]
734    pub async fn from_table_catalog(
735        table_catalog: &Table,
736        store: S,
737        vnodes: Option<Arc<Bitmap>>,
738    ) -> Self {
739        StateTableBuilder::new(table_catalog, store, vnodes)
740            .forbid_preload_all_rows()
741            .build()
742            .await
743    }
744
745    /// Create state table from table catalog and store with sanity check disabled.
746    pub async fn from_table_catalog_inconsistent_op(
747        table_catalog: &Table,
748        store: S,
749        vnodes: Option<Arc<Bitmap>>,
750    ) -> Self {
751        StateTableBuilder::new(table_catalog, store, vnodes)
752            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
753            .forbid_preload_all_rows()
754            .build()
755            .await
756    }
757}
758
759impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
760    StateTableBuilder<S, SD, IS_REPLICATED, ()>
761{
762    pub fn new(table_catalog: &Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
763        let table_id = table_catalog.id;
764        let table_columns: Vec<ColumnDesc> = table_catalog
765            .columns
766            .iter()
767            .map(|col| col.column_desc.as_ref().unwrap().into())
768            .collect();
769        let order_types: Vec<OrderType> = table_catalog
770            .pk
771            .iter()
772            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
773            .collect();
774        let dist_key_indices: Vec<usize> = table_catalog
775            .distribution_key
776            .iter()
777            .map(|dist_index| *dist_index as usize)
778            .collect();
779
780        let pk_indices = table_catalog
781            .pk
782            .iter()
783            .map(|col_order| col_order.column_index as usize)
784            .collect_vec();
785
786        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
787        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
788            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
789        } else {
790            table_catalog
791                .get_dist_key_in_pk()
792                .iter()
793                .map(|idx| *idx as usize)
794                .collect()
795        };
796
797        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
798            let vnode_col_idx = *idx as usize;
799            pk_indices.iter().position(|&i| vnode_col_idx == i)
800        });
801        let value_indices = table_catalog
802            .value_indices
803            .iter()
804            .map(|val| *val as usize)
805            .collect_vec();
806        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
807        if clean_watermark_indices.len() > 1 {
808            unimplemented!("multiple clean watermark columns are not supported yet")
809        }
810        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
811
812        Self {
813            table_id,
814            table_name_for_debug: table_catalog.name.clone(),
815            table_columns,
816            order_types,
817            pk_indices,
818            dist_key_in_pk_indices,
819            vnode_col_idx_in_pk,
820            expected_vnode_count: table_catalog.vnode_count(),
821            value_indices,
822            prefix_hint_len: table_catalog.read_prefix_len_hint as usize,
823            retention_seconds: table_catalog.retention_seconds,
824            versioned: table_catalog.version.is_some(),
825            fragment_id: table_catalog.fragment_id,
826            clean_watermark_index,
827            store,
828            vnodes,
829            op_consistency_level: None,
830            output_column_ids: None,
831            preload_all_rows: (),
832            enable_vnode_key_stats: None,
833            enable_state_table_vnode_stats_pruning: false,
834            metrics: None,
835            _serde: Default::default(),
836        }
837    }
838}
839
840impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
841    StateTableBuilder<S, SD, IS_REPLICATED, bool>
842{
843    async fn build_inner(
844        self,
845        preload_all_rows: bool,
846        should_enable_vnode_key_stats: bool,
847    ) -> StateTableInner<S, SD, IS_REPLICATED> {
848        let table_id = self.table_id;
849        let table_columns = self.table_columns;
850        let order_types = self.order_types;
851        let pk_indices = self.pk_indices;
852        let dist_key_in_pk_indices = self.dist_key_in_pk_indices;
853        let vnode_col_idx_in_pk = self.vnode_col_idx_in_pk;
854        let prefix_hint_len = self.prefix_hint_len;
855        let metrics = self.metrics;
856
857        let op_consistency_level = self
858            .op_consistency_level
859            .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue);
860
861        let output_column_ids = self.output_column_ids.unwrap_or_default();
862
863        let data_types: Vec<DataType> = table_columns
864            .iter()
865            .map(|col| col.data_type.clone())
866            .collect();
867
868        // For replicated state tables (used in hash temporal join), the join key (pk_prefix) is
869        // guaranteed by the optimizer to cover the distribution key, which is required by
870        // `compute_prefix_vnode`. Assert this invariant at build time.
871        if IS_REPLICATED && prefix_hint_len > 0 {
872            assert!(
873                dist_key_in_pk_indices.iter().all(|&d| d < prefix_hint_len),
874                "replicated state table: distribution key indices {:?} must all be covered by \
875                 prefix_hint_len {}",
876                dist_key_in_pk_indices,
877                prefix_hint_len,
878            );
879        }
880
881        let distribution =
882            TableDistribution::new(self.vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
883        assert_eq!(
884            distribution.vnode_count(),
885            self.expected_vnode_count,
886            "vnode count mismatch, scanning table {} under wrong distribution?",
887            self.table_name_for_debug,
888        );
889
890        let pk_data_types = pk_indices
891            .iter()
892            .map(|i| table_columns[*i].data_type.clone())
893            .collect();
894        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
895
896        let input_value_indices = self.value_indices;
897
898        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
899
900        // if value_indices is the no shuffle full columns.
901        let value_indices = match input_value_indices.len() == table_columns.len()
902            && input_value_indices == no_shuffle_value_indices
903        {
904            true => None,
905            false => Some(input_value_indices.clone()),
906        };
907
908        let row_serde = Arc::new(SD::new(
909            Arc::from_iter(input_value_indices.iter().copied()),
910            Arc::from(table_columns.clone().into_boxed_slice()),
911        ));
912
913        let state_table_op_consistency_level = op_consistency_level;
914        let op_consistency_level = match state_table_op_consistency_level {
915            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
916            StateTableOpConsistencyLevel::ConsistentOldValue => {
917                consistent_old_value_op(row_serde.clone(), false)
918            }
919            StateTableOpConsistencyLevel::LogStoreEnabled => {
920                consistent_old_value_op(row_serde.clone(), true)
921            }
922        };
923
924        let table_option = TableOption::new(self.retention_seconds);
925        let new_local_options = if IS_REPLICATED {
926            NewLocalOptions::new_replicated(
927                table_id,
928                self.fragment_id,
929                op_consistency_level,
930                table_option,
931                distribution.vnodes().clone(),
932            )
933        } else {
934            NewLocalOptions::new(
935                table_id,
936                self.fragment_id,
937                op_consistency_level,
938                table_option,
939                distribution.vnodes().clone(),
940                true,
941            )
942        };
943        let local_state_store = self.store.new_local(new_local_options).await;
944
945        // If state table has versioning, that means it supports
946        // Schema change. In that case, the row encoding should be column aware as well.
947        // Otherwise both will be false.
948        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
949        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
950        assert_eq!(self.versioned, row_serde.kind().is_column_aware());
951
952        // Get info for replicated state table.
953        let output_column_ids_to_input_idx = output_column_ids
954            .iter()
955            .enumerate()
956            .map(|(pos, id)| (*id, pos))
957            .collect::<HashMap<_, _>>();
958
959        let columns = table_columns;
960
961        // Compute i2o mapping
962        // Note that this can be a partial mapping, since we use the i2o mapping to get
963        // any 1 of the output columns, and use that to fill the input column.
964        let mut i2o_mapping = vec![None; columns.len()];
965        for (i, column) in columns.iter().enumerate() {
966            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
967                i2o_mapping[i] = Some(*pos);
968            }
969        }
970        // We can prune any duplicate column indices
971        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
972
973        // Compute output indices
974        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
975
976        // For replicated state tables, pk columns must be explicitly provided by the write caller
977        // rather than being filled with None internally via i2o_mapping.
978        if IS_REPLICATED {
979            assert!(
980                pk_indices
981                    .iter()
982                    .all(|&pk_idx| output_indices.contains(&pk_idx)),
983                "all pk columns must be included in output_column_ids for replicated state table"
984            );
985        }
986
987        let clean_watermark_index = self.clean_watermark_index;
988        let watermark_serde = clean_watermark_index.map(|idx| {
989            let pk_idx = pk_indices.iter().position(|&i| i == idx);
990            let (watermark_serde, watermark_serde_type) = match pk_idx {
991                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
992                Some(pk_idx) => (
993                    pk_serde.index(pk_idx).into_owned(),
994                    WatermarkSerdeType::NonPkPrefix,
995                ),
996                None => (
997                    OrderedRowSerde::new(
998                        vec![data_types[idx].clone()],
999                        vec![OrderType::ascending()],
1000                    ),
1001                    WatermarkSerdeType::Value,
1002                ),
1003            };
1004            (watermark_serde, watermark_serde_type)
1005        });
1006
1007        // Restore persisted table watermark.
1008        let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref() {
1009            distribution
1010                .vnodes()
1011                .iter_vnodes()
1012                .filter_map(|vnode| {
1013                    let bytes = local_state_store.get_table_watermark(vnode)?;
1014                    let datum = deser.deserialize(&bytes).ok().and_then(|row| {
1015                        assert!(row.len() == 1);
1016                        row[0].clone()
1017                    });
1018                    if datum.is_none() {
1019                        tracing::error!(
1020                            ?vnode,
1021                            watermark = ?bytes,
1022                            "Failed to deserialize persisted watermark from state store.",
1023                        );
1024                    }
1025                    datum
1026                })
1027                .max_by(|a, b| cmp_datum(Some(a), Some(b), OrderType::ascending()))
1028        } else {
1029            None
1030        };
1031
1032        StateTableInner {
1033            table_id,
1034            row_store: StateTableRowStore {
1035                all_rows: preload_all_rows.then(HashMap::new),
1036                state_store: local_state_store,
1037                row_serde,
1038                pk_serde: pk_serde.clone(),
1039                table_id,
1040                // Need to maintain vnode min/max key stats when vnode key pruning is enabled
1041                vnode_stats: should_enable_vnode_key_stats.then(HashMap::new),
1042                enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
1043                metrics,
1044            },
1045            store: self.store,
1046            epoch: None,
1047            pk_serde,
1048            pk_indices,
1049            distribution,
1050            prefix_hint_len,
1051            value_indices,
1052            pending_watermark: None,
1053            committed_watermark,
1054            watermark_serde,
1055            data_types,
1056            output_indices,
1057            i2o_mapping,
1058            op_consistency_level: state_table_op_consistency_level,
1059            clean_watermark_index,
1060            on_post_commit: false,
1061        }
1062    }
1063}
1064
1065impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
1066    StateTableBuilder<S, SD, IS_REPLICATED, ()>
1067{
1068    pub fn new_from_storage_table_desc(
1069        table_desc: &StorageTableDesc,
1070        store: S,
1071        vnodes: Option<Arc<Bitmap>>,
1072        fragment_id: FragmentId,
1073    ) -> Self {
1074        let table_id = table_desc.table_id;
1075        let table_columns: Vec<ColumnDesc> =
1076            table_desc.columns.iter().map(ColumnDesc::from).collect();
1077        let order_types: Vec<OrderType> = table_desc
1078            .pk
1079            .iter()
1080            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
1081            .collect();
1082        let pk_indices = table_desc
1083            .pk
1084            .iter()
1085            .map(|col_order| col_order.column_index as usize)
1086            .collect_vec();
1087        let dist_key_in_pk_indices = table_desc
1088            .dist_key_in_pk_indices
1089            .iter()
1090            .map(|&idx| idx as usize)
1091            .collect();
1092        // StorageTableDesc provides vnode_col_idx_in_pk directly (already pk-relative),
1093        // unlike Table which has an absolute column index that needs conversion.
1094        let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
1095        let raw_value_indices = table_desc
1096            .value_indices
1097            .iter()
1098            .map(|val| *val as usize)
1099            .collect_vec();
1100
1101        Self {
1102            table_id,
1103            table_name_for_debug: table_id.to_string(),
1104            table_columns,
1105            order_types,
1106            pk_indices,
1107            dist_key_in_pk_indices,
1108            vnode_col_idx_in_pk,
1109            expected_vnode_count: table_desc.vnode_count(),
1110            value_indices: raw_value_indices,
1111            prefix_hint_len: table_desc.read_prefix_len_hint as usize,
1112            retention_seconds: table_desc.retention_seconds,
1113            versioned: table_desc.versioned,
1114            fragment_id,
1115            clean_watermark_index: None,
1116            store,
1117            vnodes,
1118            op_consistency_level: None,
1119            output_column_ids: None,
1120            preload_all_rows: (),
1121            enable_vnode_key_stats: None,
1122            enable_state_table_vnode_stats_pruning: false,
1123            metrics: None,
1124            _serde: Default::default(),
1125        }
1126    }
1127}
1128
1129impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1130where
1131    S: StateStore,
1132    SD: ValueRowSerde,
1133{
1134    pub fn get_data_types(&self) -> &[DataType] {
1135        &self.data_types
1136    }
1137
1138    pub fn table_id(&self) -> TableId {
1139        self.table_id
1140    }
1141
1142    /// Get the vnode value with given (prefix of) primary key
1143    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1144        self.distribution
1145            .try_compute_vnode_by_pk_prefix(pk_prefix)
1146            .expect("For streaming, the given prefix must be enough to calculate the vnode")
1147    }
1148
1149    /// Get the vnode value of the given primary key
1150    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1151        self.distribution.compute_vnode_by_pk(pk)
1152    }
1153
1154    /// NOTE(kwannoel): This is used by backfill.
1155    /// We want to check pk indices of upstream table.
1156    pub fn pk_indices(&self) -> &[usize] {
1157        &self.pk_indices
1158    }
1159
1160    /// Get the indices of the primary key columns in the output columns.
1161    ///
1162    /// Returns `None` if any of the primary key columns is not in the output columns.
1163    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1164        assert!(IS_REPLICATED);
1165        self.pk_indices
1166            .iter()
1167            .map(|&i| self.output_indices.iter().position(|&j| i == j))
1168            .collect()
1169    }
1170
1171    pub fn pk_serde(&self) -> &OrderedRowSerde {
1172        &self.pk_serde
1173    }
1174
1175    pub fn vnodes(&self) -> &Arc<Bitmap> {
1176        self.distribution.vnodes()
1177    }
1178
1179    pub fn value_indices(&self) -> &Option<Vec<usize>> {
1180        &self.value_indices
1181    }
1182
1183    pub fn is_consistent_op(&self) -> bool {
1184        matches!(
1185            self.op_consistency_level,
1186            StateTableOpConsistencyLevel::ConsistentOldValue
1187                | StateTableOpConsistencyLevel::LogStoreEnabled
1188        )
1189    }
1190
1191    pub fn metrics(&self) -> Option<&StateTableMetrics> {
1192        self.row_store.metrics.as_ref()
1193    }
1194}
1195
1196impl<S, SD> StateTableInner<S, SD, true>
1197where
1198    S: StateStore,
1199    SD: ValueRowSerde,
1200{
1201    /// Create replicated state table from table catalog with output indices
1202    pub async fn new_replicated(
1203        table_catalog: &Table,
1204        store: S,
1205        vnodes: Option<Arc<Bitmap>>,
1206        output_column_ids: Vec<ColumnId>,
1207    ) -> Self {
1208        // TODO: can it be ConsistentOldValue?
1209        // TODO: may enable preload_all_rows
1210        StateTableBuilder::new(table_catalog, store, vnodes)
1211            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1212            .with_output_column_ids(output_column_ids)
1213            .forbid_preload_all_rows()
1214            .build()
1215            .await
1216    }
1217}
1218
1219// point get
1220impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1221where
1222    S: StateStore,
1223    SD: ValueRowSerde,
1224{
1225    /// Get a single row from state table.
1226    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1227        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1228        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1229        match row {
1230            Some(row) => {
1231                if IS_REPLICATED {
1232                    // If the table is replicated, we need to deserialize the row with the output
1233                    // indices.
1234                    let row = row.project(&self.output_indices);
1235                    Ok(Some(row.into_owned_row()))
1236                } else {
1237                    Ok(Some(row))
1238                }
1239            }
1240            None => Ok(None),
1241        }
1242    }
1243
1244    /// Get a raw encoded row from state table.
1245    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1246        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1247        self.row_store.exists(serialized_pk, prefix_hint).await
1248    }
1249
1250    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1251        assert!(pk.len() <= self.pk_indices.len());
1252        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1253    }
1254
1255    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1256        let serialized_pk = self.serialize_pk(&pk);
1257        let prefix_hint = if should_calculate_prefix_hint(self.prefix_hint_len, pk.len(), false) {
1258            Some(serialized_pk.slice(VirtualNode::SIZE..))
1259        } else {
1260            #[cfg(debug_assertions)]
1261            if self.prefix_hint_len != 0 {
1262                warn!(
1263                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1264                );
1265            }
1266            None
1267        };
1268        (serialized_pk, prefix_hint)
1269    }
1270}
1271
1272impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1273    async fn get(
1274        &self,
1275        key_bytes: TableKey<Bytes>,
1276        prefix_hint: Option<Bytes>,
1277    ) -> StreamExecutorResult<Option<OwnedRow>> {
1278        if let Some(m) = &self.metrics {
1279            m.get_count.inc();
1280        }
1281        if let Some(rows) = &self.all_rows {
1282            let (vnode, key) = key_bytes.split_vnode_bytes();
1283            return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1284        }
1285
1286        // Try to prune using vnode statistics
1287        let should_prune = if let Some(stats) = &self.vnode_stats
1288            && let (vnode, key) = key_bytes.split_vnode_bytes()
1289            && let Some(vnode_stat) = stats.get(&vnode)
1290            && vnode_stat.can_prune(&key)
1291        {
1292            if let Some(m) = &self.metrics {
1293                m.get_vnode_pruned_count.inc();
1294            }
1295            true
1296        } else {
1297            false
1298        };
1299
1300        if should_prune && self.enable_state_table_vnode_stats_pruning {
1301            return Ok(None);
1302        }
1303
1304        let read_options = ReadOptions {
1305            prefix_hint,
1306            cache_policy: CachePolicy::Fill(Hint::Normal),
1307            ..Default::default()
1308        };
1309
1310        let result = self
1311            .state_store
1312            .on_key_value(key_bytes, read_options, move |_, value| {
1313                let row = self.row_serde.deserialize(value)?;
1314                Ok(OwnedRow::new(row))
1315            })
1316            .await
1317            .map_err(Into::<StreamExecutorError>::into)?;
1318
1319        // In dry-run mode, verify that pruning would have been correct
1320        if should_prune && result.is_some() {
1321            tracing::warn!(
1322                table_id = %self.table_id,
1323                "vnode stats pruning dry run fails for get. This will not affect correctness."
1324            );
1325        }
1326
1327        Ok(result)
1328    }
1329
1330    async fn exists(
1331        &self,
1332        key_bytes: TableKey<Bytes>,
1333        prefix_hint: Option<Bytes>,
1334    ) -> StreamExecutorResult<bool> {
1335        if let Some(m) = &self.metrics {
1336            m.get_count.inc();
1337        }
1338        if let Some(rows) = &self.all_rows {
1339            let (vnode, key) = key_bytes.split_vnode_bytes();
1340            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1341        }
1342
1343        // Try to prune using vnode statistics
1344        let should_prune = if let Some(stats) = &self.vnode_stats
1345            && let (vnode, key) = key_bytes.split_vnode_bytes()
1346            && let Some(vnode_stat) = stats.get(&vnode)
1347            && vnode_stat.can_prune(&key)
1348        {
1349            if let Some(m) = &self.metrics {
1350                m.get_vnode_pruned_count.inc();
1351            }
1352            true
1353        } else {
1354            false
1355        };
1356
1357        if should_prune && self.enable_state_table_vnode_stats_pruning {
1358            return Ok(false);
1359        }
1360
1361        let read_options = ReadOptions {
1362            prefix_hint,
1363            cache_policy: CachePolicy::Fill(Hint::Normal),
1364            ..Default::default()
1365        };
1366        let result = self
1367            .state_store
1368            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1369            .await?;
1370        let exists = result.is_some();
1371
1372        // In dry-run mode, verify that pruning would have been correct
1373        if should_prune && exists {
1374            tracing::warn!(
1375                table_id = %self.table_id,
1376                "vnode stats pruning dry run fails for exists. This will not affect correctness."
1377            );
1378        }
1379
1380        Ok(exists)
1381    }
1382}
1383
1384/// A callback struct returned from [`StateTableInner::commit`].
1385///
1386/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1387/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1388/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1389///
1390/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1391/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1392/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1393/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1394/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1395/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1396/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1397/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1398#[must_use]
1399pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1400where
1401    S: StateStore,
1402    SD: ValueRowSerde,
1403{
1404    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1405}
1406
1407impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1408where
1409    S: StateStore,
1410    SD: ValueRowSerde,
1411{
1412    /// Returns `Some((new_vnodes, old_vnodes, state_table), keyed_cache_may_stale)` if the vnode bitmap is updated.
1413    ///
1414    /// Note the `keyed_cache_may_stale` only applies to keyed cache. If the executor's cache is not keyed, but will
1415    /// be consumed with all vnodes it owns, the executor may need to ALWAYS clear the cache regardless of this flag.
1416    pub async fn post_yield_barrier(
1417        mut self,
1418        new_vnodes: Option<Arc<Bitmap>>,
1419    ) -> StreamExecutorResult<
1420        Option<(
1421            (
1422                Arc<Bitmap>,
1423                Arc<Bitmap>,
1424                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1425            ),
1426            bool,
1427        )>,
1428    > {
1429        self.inner.on_post_commit = false;
1430        Ok(if let Some(new_vnodes) = new_vnodes {
1431            let (old_vnodes, keyed_cache_may_stale) =
1432                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1433            Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1434        } else {
1435            None
1436        })
1437    }
1438
1439    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1440        &*self.inner
1441    }
1442
1443    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1444    async fn update_vnode_bitmap(
1445        &mut self,
1446        new_vnodes: Arc<Bitmap>,
1447    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1448        let prev_vnodes = self
1449            .inner
1450            .row_store
1451            .update_vnode_bitmap(new_vnodes.clone())
1452            .await?;
1453        assert_eq!(
1454            &prev_vnodes,
1455            self.inner.vnodes(),
1456            "state table and state store vnode bitmap mismatches"
1457        );
1458
1459        if self.inner.distribution.is_singleton() {
1460            assert_eq!(
1461                &new_vnodes,
1462                self.inner.vnodes(),
1463                "should not update vnode bitmap for singleton table"
1464            );
1465        }
1466        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1467
1468        let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1469
1470        if keyed_cache_may_stale {
1471            self.inner.pending_watermark = None;
1472        }
1473
1474        Ok((
1475            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1476            keyed_cache_may_stale,
1477        ))
1478    }
1479}
1480
1481// write
1482impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1483    fn handle_mem_table_error(&self, e: StorageError) {
1484        let e = match e.into_inner() {
1485            ErrorKind::MemTable(e) => e,
1486            _ => unreachable!("should only get memtable error"),
1487        };
1488        match *e {
1489            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1490                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1491                panic!(
1492                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1493                    self.table_id,
1494                    vnode,
1495                    &key,
1496                    prev.debug_fmt(&*self.row_serde),
1497                    new.debug_fmt(&*self.row_serde),
1498                )
1499            }
1500        }
1501    }
1502
1503    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1504        insane_mode_discard_point!();
1505        let value_bytes = self.row_serde.serialize(&value).into();
1506
1507        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1508
1509        // Update vnode statistics (skip if all_rows is present)
1510        if self.all_rows.is_none()
1511            && let Some(stats) = &mut self.vnode_stats
1512            && let Some(vnode_stat) = stats.get_mut(&vnode)
1513        {
1514            vnode_stat.update_with_key(&key_without_vnode);
1515        }
1516
1517        if let Some(rows) = &mut self.all_rows {
1518            rows.get_mut(&vnode)
1519                .expect("covered vnode")
1520                .insert(key_without_vnode, value.into_owned_row());
1521        }
1522        self.state_store
1523            .insert(key, value_bytes, None)
1524            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1525    }
1526
1527    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1528        insane_mode_discard_point!();
1529        let value_bytes = self.row_serde.serialize(value).into();
1530
1531        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1532
1533        if self.all_rows.is_none()
1534            && let Some(stats) = &mut self.vnode_stats
1535            && let Some(vnode_stat) = stats.get_mut(&vnode)
1536        {
1537            vnode_stat.update_with_key(&key_without_vnode);
1538        }
1539
1540        if let Some(rows) = &mut self.all_rows {
1541            rows.get_mut(&vnode)
1542                .expect("covered vnode")
1543                .remove(&key_without_vnode);
1544        }
1545        self.state_store
1546            .delete(key, value_bytes)
1547            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1548    }
1549
1550    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1551        insane_mode_discard_point!();
1552        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1553        let old_value_bytes = self.row_serde.serialize(old_value).into();
1554
1555        let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1556
1557        // Update does not change the key, so statistics remain valid (skip if all_rows is present)
1558        // But we update to ensure consistency
1559        if self.all_rows.is_none()
1560            && let Some(stats) = &mut self.vnode_stats
1561            && let Some(vnode_stat) = stats.get_mut(&vnode)
1562        {
1563            vnode_stat.update_with_key(&key_without_vnode);
1564        }
1565
1566        if let Some(rows) = &mut self.all_rows {
1567            rows.get_mut(&vnode)
1568                .expect("covered vnode")
1569                .insert(key_without_vnode, new_value.into_owned_row());
1570        }
1571        self.state_store
1572            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1573            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1574    }
1575}
1576
1577impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1578where
1579    S: StateStore,
1580    SD: ValueRowSerde,
1581{
1582    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1583    /// the table.
1584    pub fn insert(&mut self, value: impl Row) {
1585        let pk_indices = &self.pk_indices;
1586        let pk = (&value).project(pk_indices);
1587
1588        let key_bytes = self.serialize_pk(&pk);
1589        dispatch_value_indices!(&self.value_indices, [value], {
1590            self.row_store.insert(key_bytes, value)
1591        })
1592    }
1593
1594    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1595    /// column desc of the table.
1596    pub fn delete(&mut self, old_value: impl Row) {
1597        let pk_indices = &self.pk_indices;
1598        let pk = (&old_value).project(pk_indices);
1599
1600        let key_bytes = self.serialize_pk(&pk);
1601        dispatch_value_indices!(&self.value_indices, [old_value], {
1602            self.row_store.delete(key_bytes, old_value)
1603        })
1604    }
1605
1606    /// Update a row. The old and new value should have the same pk.
1607    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1608        let old_pk = (&old_value).project(self.pk_indices());
1609        let new_pk = (&new_value).project(self.pk_indices());
1610        debug_assert!(
1611            Row::eq(&old_pk, new_pk),
1612            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1613            self.table_id
1614        );
1615
1616        let key_bytes = self.serialize_pk(&new_pk);
1617        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1618            self.row_store.update(key_bytes, old_value, new_value)
1619        })
1620    }
1621
1622    /// Write a record into state table. Must have the same schema with the table.
1623    pub fn write_record(&mut self, record: Record<impl Row>) {
1624        match record {
1625            Record::Insert { new_row } => self.insert(new_row),
1626            Record::Delete { old_row } => self.delete(old_row),
1627            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1628        }
1629    }
1630
1631    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1632        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1633    }
1634
1635    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1636    // allow(izip, which use zip instead of zip_eq)
1637    #[allow(clippy::disallowed_methods)]
1638    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1639        let chunk = if IS_REPLICATED {
1640            self.fill_non_output_indices(chunk)
1641        } else {
1642            chunk
1643        };
1644
1645        let vnodes = self
1646            .distribution
1647            .compute_chunk_vnode(&chunk, &self.pk_indices);
1648
1649        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1650            let Some((op, row)) = optional_row else {
1651                continue;
1652            };
1653            let pk = row.project(&self.pk_indices);
1654            let vnode = vnodes[idx];
1655            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1656            match op {
1657                Op::Insert | Op::UpdateInsert => {
1658                    dispatch_value_indices!(&self.value_indices, [row], {
1659                        self.row_store.insert(key_bytes, row);
1660                    });
1661                }
1662                Op::Delete | Op::UpdateDelete => {
1663                    dispatch_value_indices!(&self.value_indices, [row], {
1664                        self.row_store.delete(key_bytes, row);
1665                    });
1666                }
1667            }
1668        }
1669    }
1670
1671    /// Update watermark for state cleaning.
1672    ///
1673    /// # Arguments
1674    ///
1675    /// * `watermark` - Latest watermark received.
1676    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1677        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1678        self.pending_watermark = Some(watermark);
1679    }
1680
1681    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1682    /// table through `update_watermark` method.
1683    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1684        self.committed_watermark.as_ref()
1685    }
1686
1687    pub async fn commit(
1688        &mut self,
1689        new_epoch: EpochPair,
1690    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1691        self.commit_inner(new_epoch, None).await
1692    }
1693
1694    #[cfg(test)]
1695    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1696        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1697    }
1698
1699    pub async fn commit_assert_no_update_vnode_bitmap(
1700        &mut self,
1701        new_epoch: EpochPair,
1702    ) -> StreamExecutorResult<()> {
1703        let post_commit = self.commit_inner(new_epoch, None).await?;
1704        post_commit.post_yield_barrier(None).await?;
1705        Ok(())
1706    }
1707
1708    pub async fn commit_may_switch_consistent_op(
1709        &mut self,
1710        new_epoch: EpochPair,
1711        op_consistency_level: StateTableOpConsistencyLevel,
1712    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1713        if self.op_consistency_level != op_consistency_level {
1714            // avoid flooding e2e-test log
1715            if !cfg!(debug_assertions) {
1716                info!(
1717                    ?new_epoch,
1718                    prev_op_consistency_level = ?self.op_consistency_level,
1719                    ?op_consistency_level,
1720                    table_id = %self.table_id,
1721                    "switch to new op consistency level"
1722                );
1723            }
1724            self.commit_inner(new_epoch, Some(op_consistency_level))
1725                .await
1726        } else {
1727            self.commit_inner(new_epoch, None).await
1728        }
1729    }
1730
1731    async fn commit_inner(
1732        &mut self,
1733        new_epoch: EpochPair,
1734        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1735    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1736        assert!(!self.on_post_commit);
1737        assert_eq!(
1738            self.epoch.expect("should only be called after init").curr,
1739            new_epoch.prev
1740        );
1741        if let Some(new_consistency_level) = switch_consistent_op {
1742            assert_ne!(self.op_consistency_level, new_consistency_level);
1743            self.op_consistency_level = new_consistency_level;
1744        }
1745        trace!(
1746            table_id = %self.table_id,
1747            epoch = ?self.epoch,
1748            "commit state table"
1749        );
1750
1751        let table_watermarks = self.commit_pending_watermark();
1752        self.row_store
1753            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1754            .await?;
1755        self.epoch = Some(new_epoch);
1756
1757        self.on_post_commit = true;
1758        Ok(StateTablePostCommit { inner: self })
1759    }
1760
1761    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1762    fn commit_pending_watermark(
1763        &mut self,
1764    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1765        let watermark = self.pending_watermark.take()?;
1766        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1767
1768        assert!(
1769            !self.pk_indices().is_empty(),
1770            "see pending watermark on empty pk"
1771        );
1772        let (watermark_serializer, watermark_type) = self
1773            .watermark_serde
1774            .as_ref()
1775            .expect("watermark serde should be initialized to commit watermark");
1776        let watermark_suffix =
1777            serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1778        let vnode_watermark = VnodeWatermark::new(
1779            self.vnodes().clone(),
1780            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1781        );
1782        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1783
1784        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1785        let direction = if order_type.is_ascending() {
1786            WatermarkDirection::Ascending
1787        } else {
1788            WatermarkDirection::Descending
1789        };
1790
1791        self.committed_watermark = Some(watermark);
1792        Some((direction, vec![vnode_watermark], *watermark_type))
1793    }
1794
1795    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1796        self.row_store.try_flush().await?;
1797        Ok(())
1798    }
1799}
1800
1801// Manually expand trait alias for better IDE experience.
1802pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1803impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1804
1805pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1806impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1807
1808pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1809impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1810
1811pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1812
1813pub trait FromVnodeBytes {
1814    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1815}
1816
1817impl FromVnodeBytes for Bytes {
1818    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1819        prefix_slice_with_vnode(vnode, bytes)
1820    }
1821}
1822
1823impl FromVnodeBytes for () {
1824    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1825}
1826
1827// Iterator functions
1828impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1829where
1830    S: StateStore,
1831    SD: ValueRowSerde,
1832{
1833    /// This function scans rows from the relational table with specific `pk_range` under the same
1834    /// `vnode`.
1835    pub async fn iter_with_vnode(
1836        &self,
1837
1838        // Optional vnode that returns an iterator only over the given range under that vnode.
1839        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1840        // iterate over each vnode that the `StateTableInner` owns.
1841        vnode: VirtualNode,
1842        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1843        prefetch_options: PrefetchOptions,
1844    ) -> StreamExecutorResult<impl RowStream<'_>> {
1845        Ok(self
1846            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1847            .await?
1848            .map_ok(|(_, row)| {
1849                if IS_REPLICATED {
1850                    row.project(&self.output_indices).into_owned_row()
1851                } else {
1852                    row
1853                }
1854            }))
1855    }
1856
1857    pub async fn iter_keyed_row_with_vnode(
1858        &self,
1859        vnode: VirtualNode,
1860        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1861        prefetch_options: PrefetchOptions,
1862    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1863        Ok(self
1864            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1865            .await?
1866            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1867    }
1868}
1869
1870impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1871    // The lowest-level API.
1872    /// Middle-level APIs:
1873    /// - [`StateTableInner::iter_with_prefix_inner`]
1874    /// - [`StateTableInner::iter_kv_with_pk_range`]
1875    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1876        &self,
1877        vnode: VirtualNode,
1878        (start, end): (Bound<Bytes>, Bound<Bytes>),
1879        prefix_hint: Option<Bytes>,
1880        prefetch_options: PrefetchOptions,
1881    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1882        if let Some(m) = &self.metrics {
1883            m.iter_count.inc();
1884        }
1885        // Check if we can prune the entire range using vnode statistics
1886        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1887            &self.vnode_stats
1888            && let Some(vnode_stat) = stats.get(&vnode)
1889        {
1890            match vnode_stat.pruned_key_range(&start, &end) {
1891                Some((new_start, new_end)) => {
1892                    if self.enable_state_table_vnode_stats_pruning {
1893                        (new_start, new_end, false)
1894                    } else {
1895                        // In dry-run mode, we don't apply pruning but verify correctness
1896                        (start, end, false)
1897                    }
1898                }
1899                None => {
1900                    if let Some(m) = &self.metrics {
1901                        m.iter_vnode_pruned_count.inc();
1902                    }
1903                    // Mark that we should prune entirely, but handle dry-run below
1904                    (start.clone(), end.clone(), true)
1905                }
1906            }
1907        } else {
1908            (start, end, false)
1909        };
1910
1911        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1912            return Ok(futures::future::Either::Left(futures::stream::empty()));
1913        }
1914
1915        let table_id = self.table_id;
1916        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1917            // Only log when in dry-run mode and we would have pruned but got results
1918            if should_prune_entirely && result.is_ok() {
1919                tracing::warn!(
1920                    table_id = %table_id,
1921                    "vnode stats pruning dry run fails for iter. This will not affect correctness."
1922                );
1923            }
1924        };
1925
1926        if let Some(rows) = &self.all_rows {
1927            return Ok(futures::future::Either::Right(
1928                futures::future::Either::Left(
1929                    futures::stream::iter(
1930                        rows.get(&vnode)
1931                            .expect("covered vnode")
1932                            .range((pruned_start, pruned_end))
1933                            .map(move |(key, value)| {
1934                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1935                            }),
1936                    )
1937                    .inspect(inspect_fn),
1938                ),
1939            ));
1940        }
1941        let read_options = ReadOptions {
1942            prefix_hint,
1943            prefetch_options,
1944            cache_policy: CachePolicy::Fill(Hint::Normal),
1945        };
1946
1947        Ok(futures::future::Either::Right(
1948            futures::future::Either::Right(
1949                deserialize_keyed_row_stream(
1950                    self.state_store
1951                        .iter(
1952                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1953                            read_options,
1954                        )
1955                        .await?,
1956                    &*self.row_serde,
1957                )
1958                .inspect(inspect_fn),
1959            ),
1960        ))
1961    }
1962
1963    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1964        &self,
1965        vnode: VirtualNode,
1966        (start, end): (Bound<Bytes>, Bound<Bytes>),
1967        prefix_hint: Option<Bytes>,
1968        prefetch_options: PrefetchOptions,
1969    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1970        if let Some(m) = &self.metrics {
1971            m.iter_count.inc();
1972        }
1973        // Check if we can prune the entire range using vnode statistics
1974        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1975            &self.vnode_stats
1976            && let Some(vnode_stat) = stats.get(&vnode)
1977        {
1978            match vnode_stat.pruned_key_range(&start, &end) {
1979                Some((new_start, new_end)) => {
1980                    if self.enable_state_table_vnode_stats_pruning {
1981                        (new_start, new_end, false)
1982                    } else {
1983                        // In dry-run mode, we don't apply pruning but verify correctness
1984                        (start, end, false)
1985                    }
1986                }
1987                None => {
1988                    if let Some(m) = &self.metrics {
1989                        m.iter_vnode_pruned_count.inc();
1990                    }
1991                    // Mark that we should prune entirely, but handle dry-run below
1992                    (start, end, true)
1993                }
1994            }
1995        } else {
1996            (start, end, false)
1997        };
1998
1999        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
2000            return Ok(futures::future::Either::Left(futures::stream::empty()));
2001        }
2002
2003        let table_id = self.table_id;
2004        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
2005            // Only log when in dry-run mode and we would have pruned but got results
2006            if should_prune_entirely && result.is_ok() {
2007                tracing::warn!(
2008                    table_id = %table_id,
2009                    "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
2010                );
2011            }
2012        };
2013
2014        if let Some(rows) = &self.all_rows {
2015            return Ok(futures::future::Either::Right(
2016                futures::future::Either::Left(
2017                    futures::stream::iter(
2018                        rows.get(&vnode)
2019                            .expect("covered vnode")
2020                            .range((pruned_start, pruned_end))
2021                            .rev()
2022                            .map(move |(key, value)| {
2023                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
2024                            }),
2025                    )
2026                    .inspect(inspect_fn),
2027                ),
2028            ));
2029        }
2030        let read_options = ReadOptions {
2031            prefix_hint,
2032            prefetch_options,
2033            cache_policy: CachePolicy::Fill(Hint::Normal),
2034        };
2035
2036        Ok(futures::future::Either::Right(
2037            futures::future::Either::Right(
2038                deserialize_keyed_row_stream(
2039                    self.state_store
2040                        .rev_iter(
2041                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
2042                            read_options,
2043                        )
2044                        .await?,
2045                    &*self.row_serde,
2046                )
2047                .inspect(inspect_fn),
2048            ),
2049        ))
2050    }
2051}
2052
2053impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
2054where
2055    S: StateStore,
2056    SD: ValueRowSerde,
2057{
2058    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
2059    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
2060    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
2061    pub async fn iter_with_prefix(
2062        &self,
2063        pk_prefix: impl Row,
2064        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2065        prefetch_options: PrefetchOptions,
2066    ) -> StreamExecutorResult<impl RowStream<'_>> {
2067        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
2068            .await?;
2069        Ok(stream.map_ok(|(_, row)| {
2070            if IS_REPLICATED {
2071                row.project(&self.output_indices).into_owned_row()
2072            } else {
2073                row
2074            }
2075        }))
2076    }
2077
2078    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
2079    /// `vnode`, and filters out rows based on watermarks. It calls `iter_with_prefix` and further filters rows
2080    /// based on the table watermark retrieved from the state store.
2081    ///
2082    /// The caller must ensure that `clean_watermark_index` is set before calling this method, otherwise it will return all rows without filtering.
2083    pub async fn iter_with_prefix_respecting_watermark(
2084        &self,
2085        pk_prefix: impl Row,
2086        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2087        prefetch_options: PrefetchOptions,
2088    ) -> StreamExecutorResult<BoxedRowStream<'_>> {
2089        let vnode = self.compute_prefix_vnode(&pk_prefix);
2090        let Some(clean_watermark_index) = self.clean_watermark_index else {
2091            return self
2092                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2093                .await
2094                .map(|s| s.boxed());
2095        };
2096        let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
2097            return Err(StreamExecutorError::from(anyhow!(
2098                "Missing watermark serde"
2099            )));
2100        };
2101        // Fast path. TableWatermarksIndex::rewrite_range_with_table_watermark has already filtered the rows.
2102        if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
2103            return self
2104                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2105                .await
2106                .map(|s| s.boxed());
2107        }
2108
2109        let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
2110        let Some(watermark_bytes) = watermark_bytes else {
2111            return self
2112                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2113                .await
2114                .map(|s| s.boxed());
2115        };
2116        let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
2117        if watermark_row.len() != 1 {
2118            return Err(StreamExecutorError::from(format!(
2119                "Watermark row should have exactly 1 column, got {}",
2120                watermark_row.len()
2121            )));
2122        }
2123        let watermark_value = watermark_row[0].clone();
2124        // StateTableInner::update_watermark should ensure that the watermark is not NULL
2125        if watermark_value.is_none() {
2126            return Err(StreamExecutorError::from(anyhow!(
2127                "Watermark cannot be NULL"
2128            )));
2129        }
2130        let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
2131            StreamExecutorError::from(anyhow!(
2132                "Watermark serde should have at least one order type"
2133            ))
2134        })?;
2135
2136        let direction = if order_type.is_ascending() {
2137            WatermarkDirection::Ascending
2138        } else {
2139            WatermarkDirection::Descending
2140        };
2141        let clean_watermark_index_in_pk = self
2142            .pk_indices
2143            .iter()
2144            .position(|&i| i == clean_watermark_index);
2145        let clean_watermark_index_in_value = match &self.value_indices {
2146            Some(value_indices) => value_indices
2147                .iter()
2148                .position(|idx| *idx == clean_watermark_index)
2149                .ok_or_else(|| {
2150                    StreamExecutorError::from(anyhow!(
2151                        "clean watermark column index {} is not included in table value indices {:?}",
2152                        clean_watermark_index,
2153                        value_indices
2154                    ))
2155                })?,
2156            None => clean_watermark_index,
2157        };
2158
2159        let stream = self
2160            .iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
2161            .await?
2162            .try_filter_map(move |(pk, row)| {
2163                let should_filter =  match watermark_type {
2164                    WatermarkSerdeType::PkPrefix => unreachable!(),
2165                    WatermarkSerdeType::NonPkPrefix => {
2166                        let table_key = TableKey(pk);
2167                        let (vnode, key) = table_key.split_vnode();
2168                        let pk_cols = self.pk_serde
2169                        .deserialize(key)
2170                        .unwrap_or_else(|e| {
2171                            panic!("Failed to deserialize table {} vnode {:?} key {:?} error: {:?}", self.table_id(), vnode, key, e.as_report());
2172                        });
2173                        direction.datum_filter_by_watermark(
2174                            pk_cols.datum_at(clean_watermark_index_in_pk.unwrap()),
2175                            &watermark_value,
2176                            *order_type,
2177                        )
2178                    },
2179                    WatermarkSerdeType::Value => {
2180                        direction.datum_filter_by_watermark(
2181                            row.datum_at(clean_watermark_index_in_value),
2182                            &watermark_value,
2183                            *order_type,
2184                        )
2185                    }
2186                };
2187                if should_filter {
2188                    ready(Ok(None))
2189                } else {
2190                    ready(Ok(Some(row)))
2191                }
2192            });
2193        Ok(stream.boxed())
2194    }
2195
2196    /// Get the row from a state table with only 1 row.
2197    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2198        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2199        let stream = self
2200            .iter_with_prefix(row::empty(), sub_range, Default::default())
2201            .await?;
2202        pin_mut!(stream);
2203
2204        if let Some(res) = stream.next().await {
2205            let value = res?.into_owned_row();
2206            assert!(stream.next().await.is_none());
2207            Ok(Some(value))
2208        } else {
2209            Ok(None)
2210        }
2211    }
2212
2213    /// Get the row from a state table with only 1 row, and the row has only 1 col.
2214    ///
2215    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
2216    /// which does not matter in the use case.
2217    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2218        Ok(self
2219            .get_from_one_row_table()
2220            .await?
2221            .and_then(|row| row[0].clone()))
2222    }
2223
2224    pub async fn iter_keyed_row_with_prefix(
2225        &self,
2226        pk_prefix: impl Row,
2227        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2228        prefetch_options: PrefetchOptions,
2229    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2230        Ok(
2231            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
2232                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2233        )
2234    }
2235
2236    pub async fn rev_iter_keyed_row_with_prefix(
2237        &self,
2238        pk_prefix: impl Row,
2239        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2240        prefetch_options: PrefetchOptions,
2241    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2242        Ok(
2243            self.iter_with_prefix_inner::</* REVERSE */ true, Bytes>(pk_prefix, sub_range, prefetch_options)
2244            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2245        )
2246    }
2247
2248    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
2249    pub async fn rev_iter_with_prefix(
2250        &self,
2251        pk_prefix: impl Row,
2252        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2253        prefetch_options: PrefetchOptions,
2254    ) -> StreamExecutorResult<impl RowStream<'_>> {
2255        Ok(
2256            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
2257                .await?.map_ok(|(_, row)| row),
2258        )
2259    }
2260
2261    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2262        &self,
2263        pk_prefix: impl Row,
2264        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2265        prefetch_options: PrefetchOptions,
2266    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2267        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2268        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2269
2270        // We assume that all usages of iterating the state table only access a single vnode.
2271        // If this assertion fails, then something must be wrong with the operator implementation or
2272        // the distribution derivation from the optimizer.
2273        let vnode = self.compute_prefix_vnode(&pk_prefix);
2274
2275        // Construct prefix hint for prefix bloom filter.
2276        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2277        if self.prefix_hint_len != 0 && !IS_REPLICATED {
2278            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2279        }
2280        let prefix_hint = {
2281            if should_calculate_prefix_hint(self.prefix_hint_len, pk_prefix.len(), true) {
2282                let encoded_prefix_len = self
2283                    .pk_serde
2284                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2285
2286                Some(Bytes::copy_from_slice(
2287                    &encoded_prefix[..encoded_prefix_len],
2288                ))
2289            } else {
2290                None
2291            }
2292        };
2293
2294        trace!(
2295            table_id = %self.table_id(),
2296            ?prefix_hint, ?pk_prefix,
2297            ?pk_prefix_indices,
2298            iter_direction = if REVERSE { "reverse" } else { "forward" },
2299            "storage_iter_with_prefix"
2300        );
2301
2302        let memcomparable_range =
2303            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2304
2305        Ok(if REVERSE {
2306            futures::future::Either::Left(
2307                self.row_store
2308                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2309                    .await?,
2310            )
2311        } else {
2312            futures::future::Either::Right(
2313                self.row_store
2314                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2315                    .await?,
2316            )
2317        })
2318    }
2319
2320    /// This function scans raw key-values from the relational table with specific `pk_range` under
2321    /// the same `vnode`.
2322    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2323        &'a self,
2324        pk_range: &(Bound<impl Row>, Bound<impl Row>),
2325        // Optional vnode that returns an iterator only over the given range under that vnode.
2326        // For now, we require this parameter, and will panic. In the future, when `None`, we can
2327        // iterate over each vnode that the `StateTableInner` owns.
2328        vnode: VirtualNode,
2329        prefetch_options: PrefetchOptions,
2330    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2331        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2332
2333        // TODO: provide a trace of useful params.
2334        self.row_store
2335            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2336            .await
2337    }
2338}
2339
2340fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2341    iter: impl StateStoreIter + 'a,
2342    deserializer: &'a impl ValueRowSerde,
2343) -> impl PkRowStream<'a, K> {
2344    iter.into_stream(move |(key, value)| {
2345        Ok((
2346            K::copy_from_slice(key.user_key.table_key.as_ref()),
2347            deserializer.deserialize(value).map(OwnedRow::new)?,
2348        ))
2349    })
2350    .map_err(Into::into)
2351}
2352
2353pub fn prefix_range_to_memcomparable(
2354    pk_serde: &OrderedRowSerde,
2355    range: &(Bound<impl Row>, Bound<impl Row>),
2356) -> (Bound<Bytes>, Bound<Bytes>) {
2357    (
2358        start_range_to_memcomparable(pk_serde, &range.0),
2359        end_range_to_memcomparable(pk_serde, &range.1, None),
2360    )
2361}
2362
2363fn prefix_and_sub_range_to_memcomparable(
2364    pk_serde: &OrderedRowSerde,
2365    sub_range: &(Bound<impl Row>, Bound<impl Row>),
2366    pk_prefix: impl Row,
2367) -> (Bound<Bytes>, Bound<Bytes>) {
2368    let (range_start, range_end) = sub_range;
2369    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2370    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2371    let start_range = match range_start {
2372        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2373        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2374        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2375    };
2376    let end_range = match range_end {
2377        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2378        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2379        Unbounded => Unbounded,
2380    };
2381    (
2382        start_range_to_memcomparable(pk_serde, &start_range),
2383        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2384    )
2385}
2386
2387fn start_range_to_memcomparable<R: Row>(
2388    pk_serde: &OrderedRowSerde,
2389    bound: &Bound<R>,
2390) -> Bound<Bytes> {
2391    let serialize_pk_prefix = |pk_prefix: &R| {
2392        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2393        serialize_pk(pk_prefix, &prefix_serializer)
2394    };
2395    match bound {
2396        Unbounded => Unbounded,
2397        Included(r) => {
2398            let serialized = serialize_pk_prefix(r);
2399
2400            Included(serialized)
2401        }
2402        Excluded(r) => {
2403            let serialized = serialize_pk_prefix(r);
2404
2405            start_bound_of_excluded_prefix(&serialized)
2406        }
2407    }
2408}
2409
2410fn end_range_to_memcomparable<R: Row>(
2411    pk_serde: &OrderedRowSerde,
2412    bound: &Bound<R>,
2413    serialized_pk_prefix: Option<Bytes>,
2414) -> Bound<Bytes> {
2415    let serialize_pk_prefix = |pk_prefix: &R| {
2416        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2417        serialize_pk(pk_prefix, &prefix_serializer)
2418    };
2419    match bound {
2420        Unbounded => match serialized_pk_prefix {
2421            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2422            None => Unbounded,
2423        },
2424        Included(r) => {
2425            let serialized = serialize_pk_prefix(r);
2426            // TODO: may use Included(serialized)?
2427            end_bound_of_prefix(&serialized)
2428        }
2429        Excluded(r) => {
2430            let serialized = serialize_pk_prefix(r);
2431            Excluded(serialized)
2432        }
2433    }
2434}
2435
2436fn fill_non_output_indices(
2437    i2o_mapping: &ColIndexMapping,
2438    data_types: &[DataType],
2439    chunk: StreamChunk,
2440) -> StreamChunk {
2441    let cardinality = chunk.cardinality();
2442    let (ops, columns, vis) = chunk.into_inner();
2443    let mut full_columns = Vec::with_capacity(data_types.len());
2444    for (i, data_type) in data_types.iter().enumerate() {
2445        if let Some(j) = i2o_mapping.try_map(i) {
2446            full_columns.push(columns[j].clone());
2447        } else {
2448            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2449            column_builder.append_n_null(cardinality);
2450            let column: ArrayRef = column_builder.finish().into();
2451            full_columns.push(column)
2452        }
2453    }
2454    let data_chunk = DataChunk::new(full_columns, vis);
2455    StreamChunk::from_parts(ops, data_chunk)
2456}
2457
2458#[cfg(test)]
2459mod tests {
2460    use std::fmt::Debug;
2461
2462    use expect_test::{Expect, expect};
2463
2464    use super::*;
2465
2466    fn check(actual: impl Debug, expect: Expect) {
2467        let actual = format!("{:#?}", actual);
2468        expect.assert_eq(&actual);
2469    }
2470
2471    #[test]
2472    fn test_fill_non_output_indices() {
2473        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2474        let replicated_chunk = [OwnedRow::new(vec![
2475            Some(222_i32.into()),
2476            Some(2_i32.into()),
2477        ])];
2478        let replicated_chunk = StreamChunk::from_parts(
2479            vec![Op::Insert],
2480            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2481        );
2482        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2483        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2484        check(
2485            filled_chunk,
2486            expect![[r#"
2487            StreamChunk { cardinality: 1, capacity: 1, data:
2488            +---+---+---+-----+
2489            | + | 2 |   | 222 |
2490            +---+---+---+-----+
2491             }"#]],
2492        );
2493    }
2494}