risingwave_stream/common/table/
state_table.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use anyhow::anyhow;
23use bytes::Bytes;
24use either::Either;
25use foyer::Hint;
26use futures::future::{ready, try_join_all};
27use futures::stream::BoxStream;
28use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
29use itertools::Itertools;
30use risingwave_common::array::stream_record::Record;
31use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
32use risingwave_common::bitmap::Bitmap;
33use risingwave_common::catalog::{
34    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
35};
36use risingwave_common::config::StreamingConfig;
37use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
38use risingwave_common::id::FragmentId;
39use risingwave_common::row::{self, OwnedRow, Row, RowExt};
40use risingwave_common::types::{DataType, ScalarImpl};
41use risingwave_common::util::column_index_mapping::ColIndexMapping;
42use risingwave_common::util::epoch::EpochPair;
43use risingwave_common::util::row_serde::OrderedRowSerde;
44use risingwave_common::util::sort_util::{OrderType, cmp_datum};
45use risingwave_common::util::value_encoding::BasicSerde;
46use risingwave_hummock_sdk::HummockReadEpoch;
47use risingwave_hummock_sdk::key::{
48    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
49    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
50};
51use risingwave_hummock_sdk::table_watermark::{
52    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
53};
54use risingwave_pb::catalog::Table;
55use risingwave_pb::plan_common::StorageTableDesc;
56use risingwave_storage::StateStore;
57use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
58use risingwave_storage::hummock::CachePolicy;
59use risingwave_storage::mem_table::MemTableError;
60use risingwave_storage::row_serde::find_columns_by_ids;
61use risingwave_storage::row_serde::row_serde_util::{
62    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode, serialize_row,
63};
64use risingwave_storage::row_serde::value_serde::ValueRowSerde;
65use risingwave_storage::store::*;
66use risingwave_storage::table::{KeyedRow, TableDistribution, should_calculate_prefix_hint};
67use thiserror_ext::AsReport;
68use tracing::{Instrument, trace};
69
70use crate::cache::keyed_cache_may_stale;
71use crate::executor::monitor::streaming_stats::StateTableMetrics;
72use crate::executor::{StreamExecutorError, StreamExecutorResult};
73
74/// This macro is used to mark a point where we want to randomly discard the operation and early
75/// return, only in insane mode.
76macro_rules! insane_mode_discard_point {
77    () => {{
78        use rand::Rng;
79        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
80            return;
81        }
82    }};
83}
84
85/// Per-vnode statistics for pruning. None means this stat is not maintained.
86/// For each vnode, we maintain the min and max storage table key (excluding the vnode part) observed in the vnode.
87/// The stat won't differentiate between tombstone and normal keys.
88struct VnodeStatistics {
89    min_key: Option<Bytes>,
90    max_key: Option<Bytes>,
91}
92
93impl VnodeStatistics {
94    fn new() -> Self {
95        Self {
96            min_key: None,
97            max_key: None,
98        }
99    }
100
101    fn update_with_key(&mut self, key: &Bytes) {
102        if let Some(min) = &self.min_key {
103            if key < min {
104                self.min_key = Some(key.clone());
105            }
106        } else {
107            self.min_key = Some(key.clone());
108        }
109
110        if let Some(max) = &self.max_key {
111            if key > max {
112                self.max_key = Some(key.clone());
113            }
114        } else {
115            self.max_key = Some(key.clone());
116        }
117    }
118
119    fn can_prune(&self, key: &Bytes) -> bool {
120        if let Some(min) = &self.min_key
121            && key < min
122        {
123            return true;
124        }
125        if let Some(max) = &self.max_key
126            && key > max
127        {
128            return true;
129        }
130        false
131    }
132
133    fn can_prune_range(&self, start: &Bound<Bytes>, end: &Bound<Bytes>) -> bool {
134        // Check if the range is completely outside vnode bounds
135        if let Some(max) = &self.max_key {
136            match start {
137                Included(s) if s > max => return true,
138                Excluded(s) if s >= max => return true,
139                _ => {}
140            }
141        }
142        if let Some(min) = &self.min_key {
143            match end {
144                Included(e) if e < min => return true,
145                Excluded(e) if e <= min => return true,
146                _ => {}
147            }
148        }
149        false
150    }
151
152    fn pruned_key_range(
153        &self,
154        start: &Bound<Bytes>,
155        end: &Bound<Bytes>,
156    ) -> Option<(Bound<Bytes>, Bound<Bytes>)> {
157        if self.can_prune_range(start, end) {
158            return None;
159        }
160        let new_start = if let Some(min) = &self.min_key {
161            match start {
162                Included(s) if s <= min => Included(min.clone()),
163                Excluded(s) if s < min => Included(min.clone()),
164                _ => start.clone(),
165            }
166        } else {
167            start.clone()
168        };
169
170        let new_end = if let Some(max) = &self.max_key {
171            match end {
172                Included(e) if e >= max => Included(max.clone()),
173                Excluded(e) if e > max => Included(max.clone()),
174                _ => end.clone(),
175            }
176        } else {
177            end.clone()
178        };
179
180        Some((new_start, new_end))
181    }
182}
183
184/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
185/// row-based encoding.
186pub struct StateTableInner<S, SD = BasicSerde, const IS_REPLICATED: bool = false>
187where
188    S: StateStore,
189    SD: ValueRowSerde,
190{
191    /// Id for this table.
192    table_id: TableId,
193
194    /// State store backend.
195    row_store: StateTableRowStore<S::Local, SD>,
196
197    /// State store for accessing snapshot data
198    store: S,
199
200    /// Current epoch
201    epoch: Option<EpochPair>,
202
203    /// Used for serializing and deserializing the primary key.
204    pk_serde: OrderedRowSerde,
205
206    /// Indices of primary key.
207    /// Note that the index is based on the all columns of the table, instead of the output ones.
208    // FIXME: revisit constructions and usages.
209    pk_indices: Vec<usize>,
210
211    /// Distribution of the state table.
212    ///
213    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
214    /// executor. The table will also check whether the written rows
215    /// conform to this partition.
216    distribution: TableDistribution,
217
218    prefix_hint_len: usize,
219
220    value_indices: Option<Vec<usize>>,
221
222    /// The index of the watermark column used for state cleaning in all columns.
223    pub clean_watermark_index: Option<usize>,
224    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
225    pending_watermark: Option<ScalarImpl>,
226    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
227    committed_watermark: Option<ScalarImpl>,
228    /// Serializer and serde type for the watermark column.
229    watermark_serde: Option<(OrderedRowSerde, WatermarkSerdeType)>,
230
231    /// Data Types
232    /// We will need to use to build data chunks from state table rows.
233    data_types: Vec<DataType>,
234
235    /// "i" here refers to the base `state_table`'s actual schema.
236    /// "o" here refers to the replicated state table's output schema.
237    /// This mapping is used to reconstruct a row being written from replicated state table.
238    /// Such that the schema of this row will match the full schema of the base state table.
239    /// It is only applicable for replication.
240    i2o_mapping: ColIndexMapping,
241
242    /// Output indices
243    /// Used for:
244    /// 1. Computing `output_value_indices` to ser/de replicated rows.
245    /// 2. Computing output pk indices to used them for backfill state.
246    pub output_indices: Vec<usize>,
247
248    op_consistency_level: StateTableOpConsistencyLevel,
249
250    /// Flag to indicate whether the state table has called `commit`, but has not called
251    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
252    on_post_commit: bool,
253}
254
255/// `StateTable` will use `BasicSerde` as default
256pub type StateTable<S> = StateTableInner<S, BasicSerde>;
257/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
258/// Used for `ArrangementBackfill` executor.
259pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
260
261// initialize
262impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
263where
264    S: StateStore,
265    SD: ValueRowSerde,
266{
267    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
268    /// and otherwise, deadlock can be likely to happen.
269    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
270        self.row_store
271            .init(epoch, self.distribution.vnodes())
272            .await?;
273        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
274        Ok(())
275    }
276
277    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
278        self.store
279            .try_wait_epoch(
280                HummockReadEpoch::Committed(prev_epoch),
281                TryWaitEpochOptions {
282                    table_id: self.table_id,
283                },
284            )
285            .await
286    }
287
288    pub fn state_store(&self) -> &S {
289        &self.store
290    }
291}
292
293fn consistent_old_value_op(
294    row_serde: Arc<impl ValueRowSerde>,
295    is_log_store: bool,
296) -> OpConsistencyLevel {
297    OpConsistencyLevel::ConsistentOldValue {
298        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
299            if first == second {
300                return true;
301            }
302            let first = match row_serde.deserialize(first) {
303                Ok(rows) => rows,
304                Err(e) => {
305                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
306                    return false;
307                }
308            };
309            let second = match row_serde.deserialize(second) {
310                Ok(rows) => rows,
311                Err(e) => {
312                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
313                    return false;
314                }
315            };
316            if first != second {
317                error!(first = ?first, second = ?second, "sanity check fail");
318                false
319            } else {
320                true
321            }
322        }),
323        is_log_store,
324    }
325}
326
327macro_rules! dispatch_value_indices {
328    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
329        if let Some(value_indices) = $value_indices {
330            $(
331                let $row_var_name = $row_var_name.project(value_indices);
332            )+
333            $body
334        } else {
335            $body
336        }
337    };
338}
339
340/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
341/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
342/// provides method to read (`get` and `iter`) over serialized key (or key range)
343/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
344struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
345    state_store: LS,
346    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
347
348    table_id: TableId,
349    row_serde: Arc<SD>,
350    // should be only used for debugging in panic message of handle_mem_table_error
351    pk_serde: OrderedRowSerde,
352
353    // Per-vnode min/max key statistics for pruning
354    vnode_stats: Option<HashMap<VirtualNode, VnodeStatistics>>,
355    /// When false, vnode stats pruning is in dry-run mode:
356    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
357    /// Reads still go to cache/storage even when pruning would indicate no results.
358    enable_state_table_vnode_stats_pruning: bool,
359    // Optional metrics for state table operations
360    pub metrics: Option<StateTableMetrics>,
361}
362
363impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
364    async fn may_load_vnode_stats(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
365        if self.vnode_stats.is_none() {
366            return Ok(());
367        }
368
369        // vnode stats must be disabled when all rows are preloaded
370        assert!(self.all_rows.is_none());
371
372        let start_time = Instant::now();
373        let mut stats_map = HashMap::new();
374
375        // Scan each vnode to get min/max keys
376        for vnode in vnode_bitmap.iter_vnodes() {
377            let mut stats = VnodeStatistics::new();
378
379            // Get min key via forward iteration
380            let memcomparable_range_with_vnode = prefixed_range_with_vnode::<Bytes>(.., vnode);
381            let read_options = ReadOptions {
382                cache_policy: CachePolicy::Fill(Hint::Low),
383                ..Default::default()
384            };
385
386            let mut iter = self
387                .state_store
388                .iter(memcomparable_range_with_vnode.clone(), read_options.clone())
389                .await?;
390            if let Some(item) = iter.try_next().await? {
391                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
392                assert_eq!(vnode, key_vnode);
393                stats.min_key = Some(Bytes::copy_from_slice(key_without_vnode));
394            }
395
396            // Get max key via reverse iteration
397            let mut rev_iter = self
398                .state_store
399                .rev_iter(memcomparable_range_with_vnode, read_options)
400                .await?;
401            if let Some(item) = rev_iter.try_next().await? {
402                let (key_vnode, key_without_vnode) = item.0.user_key.table_key.split_vnode();
403                assert_eq!(vnode, key_vnode);
404                stats.max_key = Some(Bytes::copy_from_slice(key_without_vnode));
405            }
406
407            stats_map.insert(vnode, stats);
408        }
409
410        self.vnode_stats = Some(stats_map);
411
412        // avoid flooding e2e-test log
413        if !cfg!(debug_assertions) {
414            info!(
415                table_id = %self.table_id,
416                vnode_count = vnode_bitmap.count_ones(),
417                duration = ?start_time.elapsed(),
418                "finished initializing vnode statistics"
419            );
420        }
421
422        Ok(())
423    }
424
425    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
426        if let Some(rows) = &mut self.all_rows {
427            rows.clear();
428            let start_time = Instant::now();
429            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
430                let state_store = &self.state_store;
431                let row_serde = &self.row_serde;
432                async move {
433                    let mut rows = BTreeMap::new();
434                    let memcomparable_range_with_vnode =
435                        prefixed_range_with_vnode::<Bytes>(.., vnode);
436                    // TODO: set read options
437                    let stream = deserialize_keyed_row_stream::<Bytes>(
438                        state_store
439                            .iter(
440                                memcomparable_range_with_vnode,
441                                ReadOptions {
442                                    prefix_hint: None,
443                                    prefetch_options: Default::default(),
444                                    cache_policy: Default::default(),
445                                },
446                            )
447                            .await?,
448                        &**row_serde,
449                    );
450                    pin_mut!(stream);
451                    while let Some((encoded_key, row)) = stream.try_next().await? {
452                        let key = TableKey(encoded_key);
453                        let (iter_vnode, key) = key.split_vnode_bytes();
454                        assert_eq!(vnode, iter_vnode);
455                        rows.try_insert(key, row).expect("non-duplicated");
456                    }
457                    Ok((vnode, rows)) as StreamExecutorResult<_>
458                }
459            }))
460            .await?
461            .into_iter()
462            .collect();
463            // avoid flooding e2e-test log
464            if !cfg!(debug_assertions) {
465                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
466            }
467        }
468        Ok(())
469    }
470
471    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
472        self.state_store.init(InitOptions::new(epoch)).await?;
473        self.may_reload_all_rows(vnode_bitmap).await?;
474        self.may_load_vnode_stats(vnode_bitmap).await
475    }
476
477    async fn update_vnode_bitmap(
478        &mut self,
479        vnodes: Arc<Bitmap>,
480    ) -> StreamExecutorResult<Arc<Bitmap>> {
481        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
482        self.may_reload_all_rows(&vnodes).await?;
483        self.may_load_vnode_stats(&vnodes).await?;
484
485        Ok(prev_vnodes)
486    }
487
488    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
489        self.state_store.try_flush().await?;
490        Ok(())
491    }
492
493    async fn seal_current_epoch(
494        &mut self,
495        next_epoch: u64,
496        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
497        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
498    ) -> StreamExecutorResult<()> {
499        if let Some((direction, watermarks, serde_type)) = &table_watermarks
500            && let Some(rows) = &mut self.all_rows
501        {
502            match serde_type {
503                WatermarkSerdeType::PkPrefix => {
504                    for vnode_watermark in watermarks {
505                        match direction {
506                            WatermarkDirection::Ascending => {
507                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
508                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
509                                    // split_off returns everything after the given key, including the key.
510                                    *rows = rows.split_off(vnode_watermark.watermark());
511                                }
512                            }
513                            WatermarkDirection::Descending => {
514                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
515                                let split_off_key = next_key(vnode_watermark.watermark());
516                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
517                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
518                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
519                                    // (..Include(vnode_watermark.watermark()))
520                                    rows.split_off(split_off_key.as_slice());
521                                }
522                            }
523                        }
524                    }
525                }
526                WatermarkSerdeType::NonPkPrefix => {
527                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
528                    self.all_rows = None;
529                }
530                WatermarkSerdeType::Value => {
531                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written value watermark");
532                    self.all_rows = None;
533                }
534            }
535        }
536        self.state_store
537            .flush()
538            .instrument(tracing::info_span!("state_table_flush"))
539            .await?;
540        let switch_op_consistency_level =
541            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
542                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
543                StateTableOpConsistencyLevel::ConsistentOldValue => {
544                    consistent_old_value_op(self.row_serde.clone(), false)
545                }
546                StateTableOpConsistencyLevel::LogStoreEnabled => {
547                    consistent_old_value_op(self.row_serde.clone(), true)
548                }
549            });
550        self.state_store.seal_current_epoch(
551            next_epoch,
552            SealCurrentEpochOptions {
553                table_watermarks,
554                switch_op_consistency_level,
555            },
556        );
557        Ok(())
558    }
559}
560
561#[derive(Eq, PartialEq, Copy, Clone, Debug)]
562pub enum StateTableOpConsistencyLevel {
563    /// Op is inconsistent
564    Inconsistent,
565    /// Op is consistent.
566    /// - Insert op should ensure that the key does not exist previously
567    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
568    ConsistentOldValue,
569    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
570    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
571    LogStoreEnabled,
572}
573
574pub struct StateTableBuilder<S, SD, const IS_REPLICATED: bool, PreloadAllRow> {
575    // Extracted intermediate fields (source-agnostic)
576    table_id: TableId,
577    table_name_for_debug: String,
578    table_columns: Vec<ColumnDesc>,
579    order_types: Vec<OrderType>,
580    pk_indices: Vec<usize>,
581    dist_key_in_pk_indices: Vec<usize>,
582    vnode_col_idx_in_pk: Option<usize>,
583    expected_vnode_count: usize,
584    value_indices: Vec<usize>,
585    prefix_hint_len: usize,
586    retention_seconds: Option<u32>,
587    versioned: bool,
588    fragment_id: FragmentId,
589    clean_watermark_index: Option<usize>,
590
591    // Builder configuration fields
592    store: S,
593    vnodes: Option<Arc<Bitmap>>,
594    op_consistency_level: Option<StateTableOpConsistencyLevel>,
595    output_column_ids: Option<Vec<ColumnId>>,
596    preload_all_rows: PreloadAllRow,
597    enable_vnode_key_stats: Option<bool>,
598    /// When false, vnode stats pruning is in dry-run mode:
599    /// we maintain stats and verify pruning correctness but don't actually apply pruning.
600    enable_state_table_vnode_stats_pruning: bool,
601    metrics: Option<StateTableMetrics>,
602
603    _serde: PhantomData<SD>,
604}
605
606impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
607    StateTableBuilder<S, SD, IS_REPLICATED, ()>
608{
609    fn with_preload_all_rows(
610        self,
611        preload_all_rows: bool,
612    ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
613        StateTableBuilder {
614            table_id: self.table_id,
615            table_name_for_debug: self.table_name_for_debug,
616            table_columns: self.table_columns,
617            order_types: self.order_types,
618            pk_indices: self.pk_indices,
619            dist_key_in_pk_indices: self.dist_key_in_pk_indices,
620            vnode_col_idx_in_pk: self.vnode_col_idx_in_pk,
621            expected_vnode_count: self.expected_vnode_count,
622            value_indices: self.value_indices,
623            prefix_hint_len: self.prefix_hint_len,
624            retention_seconds: self.retention_seconds,
625            versioned: self.versioned,
626            fragment_id: self.fragment_id,
627            clean_watermark_index: self.clean_watermark_index,
628            store: self.store,
629            vnodes: self.vnodes,
630            op_consistency_level: self.op_consistency_level,
631            output_column_ids: self.output_column_ids,
632            preload_all_rows,
633            enable_vnode_key_stats: self.enable_vnode_key_stats,
634            enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
635            metrics: self.metrics,
636            _serde: Default::default(),
637        }
638    }
639
640    pub fn enable_preload_all_rows_by_config(
641        self,
642        config: &StreamingConfig,
643    ) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
644        let developer = &config.developer;
645        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
646            !developer
647                .mem_preload_state_table_ids_blacklist
648                .contains(&self.table_id.as_raw_id())
649        } else {
650            developer
651                .mem_preload_state_table_ids_whitelist
652                .contains(&self.table_id.as_raw_id())
653        };
654        self.with_preload_all_rows(preload_all_rows)
655    }
656
657    pub fn forbid_preload_all_rows(self) -> StateTableBuilder<S, SD, IS_REPLICATED, bool> {
658        self.with_preload_all_rows(false)
659    }
660}
661
662impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool, PreloadAllRow>
663    StateTableBuilder<S, SD, IS_REPLICATED, PreloadAllRow>
664{
665    pub fn with_op_consistency_level(
666        mut self,
667        op_consistency_level: StateTableOpConsistencyLevel,
668    ) -> Self {
669        self.op_consistency_level = Some(op_consistency_level);
670        self
671    }
672
673    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
674        self.output_column_ids = Some(output_column_ids);
675        self
676    }
677
678    pub fn enable_vnode_key_stats(mut self, enable: bool, config: &StreamingConfig) -> Self {
679        self.enable_vnode_key_stats = Some(enable);
680        self.enable_state_table_vnode_stats_pruning =
681            enable && config.developer.enable_state_table_vnode_stats_pruning;
682        self
683    }
684
685    pub fn with_metrics(mut self, metrics: StateTableMetrics) -> Self {
686        self.metrics = Some(metrics);
687        self
688    }
689}
690
691impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
692    StateTableBuilder<S, SD, IS_REPLICATED, bool>
693{
694    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED> {
695        let mut preload_all_rows = self.preload_all_rows;
696        if preload_all_rows
697            && let Err(e) =
698                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
699        {
700            warn!(table_id=%self.table_id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
701            preload_all_rows = false;
702        }
703
704        let should_enable_vnode_key_stats = if preload_all_rows
705            && let Some(enable_vnode_key_stats) = self.enable_vnode_key_stats
706            && enable_vnode_key_stats
707        {
708            false
709        } else {
710            self.enable_vnode_key_stats.unwrap_or(false)
711        };
712        self.build_inner(preload_all_rows, should_enable_vnode_key_stats)
713            .await
714    }
715}
716
717// initialize
718// FIXME(kwannoel): Enforce that none of the constructors here
719// should be used by replicated state table.
720// Apart from from_table_catalog_inner.
721impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
722where
723    S: StateStore,
724    SD: ValueRowSerde,
725{
726    /// Create state table from table catalog and store.
727    ///
728    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
729    #[cfg(any(test, feature = "test"))]
730    pub async fn from_table_catalog(
731        table_catalog: &Table,
732        store: S,
733        vnodes: Option<Arc<Bitmap>>,
734    ) -> Self {
735        StateTableBuilder::new(table_catalog, store, vnodes)
736            .forbid_preload_all_rows()
737            .build()
738            .await
739    }
740
741    /// Create state table from table catalog and store with sanity check disabled.
742    pub async fn from_table_catalog_inconsistent_op(
743        table_catalog: &Table,
744        store: S,
745        vnodes: Option<Arc<Bitmap>>,
746    ) -> Self {
747        StateTableBuilder::new(table_catalog, store, vnodes)
748            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
749            .forbid_preload_all_rows()
750            .build()
751            .await
752    }
753}
754
755impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
756    StateTableBuilder<S, SD, IS_REPLICATED, ()>
757{
758    pub fn new(table_catalog: &Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
759        let table_id = table_catalog.id;
760        let table_columns: Vec<ColumnDesc> = table_catalog
761            .columns
762            .iter()
763            .map(|col| col.column_desc.as_ref().unwrap().into())
764            .collect();
765        let order_types: Vec<OrderType> = table_catalog
766            .pk
767            .iter()
768            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
769            .collect();
770        let dist_key_indices: Vec<usize> = table_catalog
771            .distribution_key
772            .iter()
773            .map(|dist_index| *dist_index as usize)
774            .collect();
775
776        let pk_indices = table_catalog
777            .pk
778            .iter()
779            .map(|col_order| col_order.column_index as usize)
780            .collect_vec();
781
782        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
783        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
784            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
785        } else {
786            table_catalog
787                .get_dist_key_in_pk()
788                .iter()
789                .map(|idx| *idx as usize)
790                .collect()
791        };
792
793        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
794            let vnode_col_idx = *idx as usize;
795            pk_indices.iter().position(|&i| vnode_col_idx == i)
796        });
797        let value_indices = table_catalog
798            .value_indices
799            .iter()
800            .map(|val| *val as usize)
801            .collect_vec();
802        let clean_watermark_indices = table_catalog.get_clean_watermark_column_indices();
803        if clean_watermark_indices.len() > 1 {
804            unimplemented!("multiple clean watermark columns are not supported yet")
805        }
806        let clean_watermark_index = clean_watermark_indices.first().map(|&i| i as usize);
807
808        Self {
809            table_id,
810            table_name_for_debug: table_catalog.name.clone(),
811            table_columns,
812            order_types,
813            pk_indices,
814            dist_key_in_pk_indices,
815            vnode_col_idx_in_pk,
816            expected_vnode_count: table_catalog.vnode_count(),
817            value_indices,
818            prefix_hint_len: table_catalog.read_prefix_len_hint as usize,
819            retention_seconds: table_catalog.retention_seconds,
820            versioned: table_catalog.version.is_some(),
821            fragment_id: table_catalog.fragment_id,
822            clean_watermark_index,
823            store,
824            vnodes,
825            op_consistency_level: None,
826            output_column_ids: None,
827            preload_all_rows: (),
828            enable_vnode_key_stats: None,
829            enable_state_table_vnode_stats_pruning: false,
830            metrics: None,
831            _serde: Default::default(),
832        }
833    }
834}
835
836impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
837    StateTableBuilder<S, SD, IS_REPLICATED, bool>
838{
839    async fn build_inner(
840        self,
841        preload_all_rows: bool,
842        should_enable_vnode_key_stats: bool,
843    ) -> StateTableInner<S, SD, IS_REPLICATED> {
844        let table_id = self.table_id;
845        let table_columns = self.table_columns;
846        let order_types = self.order_types;
847        let pk_indices = self.pk_indices;
848        let dist_key_in_pk_indices = self.dist_key_in_pk_indices;
849        let vnode_col_idx_in_pk = self.vnode_col_idx_in_pk;
850        let prefix_hint_len = self.prefix_hint_len;
851        let metrics = self.metrics;
852
853        let op_consistency_level = self
854            .op_consistency_level
855            .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue);
856
857        let output_column_ids = self.output_column_ids.unwrap_or_default();
858
859        let data_types: Vec<DataType> = table_columns
860            .iter()
861            .map(|col| col.data_type.clone())
862            .collect();
863
864        // For replicated state tables (used in hash temporal join), the join key (pk_prefix) is
865        // guaranteed by the optimizer to cover the distribution key, which is required by
866        // `compute_prefix_vnode`. Assert this invariant at build time.
867        if IS_REPLICATED && prefix_hint_len > 0 {
868            assert!(
869                dist_key_in_pk_indices.iter().all(|&d| d < prefix_hint_len),
870                "replicated state table: distribution key indices {:?} must all be covered by \
871                 prefix_hint_len {}",
872                dist_key_in_pk_indices,
873                prefix_hint_len,
874            );
875        }
876
877        let distribution =
878            TableDistribution::new(self.vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
879        assert_eq!(
880            distribution.vnode_count(),
881            self.expected_vnode_count,
882            "vnode count mismatch, scanning table {} under wrong distribution?",
883            self.table_name_for_debug,
884        );
885
886        let pk_data_types = pk_indices
887            .iter()
888            .map(|i| table_columns[*i].data_type.clone())
889            .collect();
890        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
891
892        let input_value_indices = self.value_indices;
893
894        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
895
896        // if value_indices is the no shuffle full columns.
897        let value_indices = match input_value_indices.len() == table_columns.len()
898            && input_value_indices == no_shuffle_value_indices
899        {
900            true => None,
901            false => Some(input_value_indices.clone()),
902        };
903
904        let row_serde = Arc::new(SD::new(
905            Arc::from_iter(input_value_indices.iter().copied()),
906            Arc::from(table_columns.clone().into_boxed_slice()),
907        ));
908
909        let state_table_op_consistency_level = op_consistency_level;
910        let op_consistency_level = match state_table_op_consistency_level {
911            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
912            StateTableOpConsistencyLevel::ConsistentOldValue => {
913                consistent_old_value_op(row_serde.clone(), false)
914            }
915            StateTableOpConsistencyLevel::LogStoreEnabled => {
916                consistent_old_value_op(row_serde.clone(), true)
917            }
918        };
919
920        let table_option = TableOption::new(self.retention_seconds);
921        let new_local_options = if IS_REPLICATED {
922            NewLocalOptions::new_replicated(
923                table_id,
924                self.fragment_id,
925                op_consistency_level,
926                table_option,
927                distribution.vnodes().clone(),
928            )
929        } else {
930            NewLocalOptions::new(
931                table_id,
932                self.fragment_id,
933                op_consistency_level,
934                table_option,
935                distribution.vnodes().clone(),
936                true,
937            )
938        };
939        let local_state_store = self.store.new_local(new_local_options).await;
940
941        // If state table has versioning, that means it supports
942        // Schema change. In that case, the row encoding should be column aware as well.
943        // Otherwise both will be false.
944        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
945        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
946        assert_eq!(self.versioned, row_serde.kind().is_column_aware());
947
948        // Get info for replicated state table.
949        let output_column_ids_to_input_idx = output_column_ids
950            .iter()
951            .enumerate()
952            .map(|(pos, id)| (*id, pos))
953            .collect::<HashMap<_, _>>();
954
955        let columns = table_columns;
956
957        // Compute i2o mapping
958        // Note that this can be a partial mapping, since we use the i2o mapping to get
959        // any 1 of the output columns, and use that to fill the input column.
960        let mut i2o_mapping = vec![None; columns.len()];
961        for (i, column) in columns.iter().enumerate() {
962            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
963                i2o_mapping[i] = Some(*pos);
964            }
965        }
966        // We can prune any duplicate column indices
967        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
968
969        // Compute output indices
970        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
971
972        // For replicated state tables, pk columns must be explicitly provided by the write caller
973        // rather than being filled with None internally via i2o_mapping.
974        if IS_REPLICATED {
975            assert!(
976                pk_indices
977                    .iter()
978                    .all(|&pk_idx| output_indices.contains(&pk_idx)),
979                "all pk columns must be included in output_column_ids for replicated state table"
980            );
981        }
982
983        let clean_watermark_index = self.clean_watermark_index;
984        let watermark_serde = clean_watermark_index.map(|idx| {
985            let pk_idx = pk_indices.iter().position(|&i| i == idx);
986            let (watermark_serde, watermark_serde_type) = match pk_idx {
987                Some(0) => (pk_serde.index(0).into_owned(), WatermarkSerdeType::PkPrefix),
988                Some(pk_idx) => (
989                    pk_serde.index(pk_idx).into_owned(),
990                    WatermarkSerdeType::NonPkPrefix,
991                ),
992                None => (
993                    OrderedRowSerde::new(
994                        vec![data_types[idx].clone()],
995                        vec![OrderType::ascending()],
996                    ),
997                    WatermarkSerdeType::Value,
998                ),
999            };
1000            (watermark_serde, watermark_serde_type)
1001        });
1002
1003        // Restore persisted table watermark.
1004        let committed_watermark = if let Some((deser, _)) = watermark_serde.as_ref() {
1005            distribution
1006                .vnodes()
1007                .iter_vnodes()
1008                .filter_map(|vnode| {
1009                    let bytes = local_state_store.get_table_watermark(vnode)?;
1010                    let datum = deser.deserialize(&bytes).ok().and_then(|row| {
1011                        assert!(row.len() == 1);
1012                        row[0].clone()
1013                    });
1014                    if datum.is_none() {
1015                        tracing::error!(
1016                            ?vnode,
1017                            watermark = ?bytes,
1018                            "Failed to deserialize persisted watermark from state store.",
1019                        );
1020                    }
1021                    datum
1022                })
1023                .max_by(|a, b| cmp_datum(Some(a), Some(b), OrderType::ascending()))
1024        } else {
1025            None
1026        };
1027
1028        StateTableInner {
1029            table_id,
1030            row_store: StateTableRowStore {
1031                all_rows: preload_all_rows.then(HashMap::new),
1032                state_store: local_state_store,
1033                row_serde,
1034                pk_serde: pk_serde.clone(),
1035                table_id,
1036                // Need to maintain vnode min/max key stats when vnode key pruning is enabled
1037                vnode_stats: should_enable_vnode_key_stats.then(HashMap::new),
1038                enable_state_table_vnode_stats_pruning: self.enable_state_table_vnode_stats_pruning,
1039                metrics,
1040            },
1041            store: self.store,
1042            epoch: None,
1043            pk_serde,
1044            pk_indices,
1045            distribution,
1046            prefix_hint_len,
1047            value_indices,
1048            pending_watermark: None,
1049            committed_watermark,
1050            watermark_serde,
1051            data_types,
1052            output_indices,
1053            i2o_mapping,
1054            op_consistency_level: state_table_op_consistency_level,
1055            clean_watermark_index,
1056            on_post_commit: false,
1057        }
1058    }
1059}
1060
1061impl<S: StateStore, SD: ValueRowSerde, const IS_REPLICATED: bool>
1062    StateTableBuilder<S, SD, IS_REPLICATED, ()>
1063{
1064    pub fn new_from_storage_table_desc(
1065        table_desc: &StorageTableDesc,
1066        store: S,
1067        vnodes: Option<Arc<Bitmap>>,
1068        fragment_id: u32,
1069    ) -> Self {
1070        let table_id = table_desc.table_id;
1071        let table_columns: Vec<ColumnDesc> =
1072            table_desc.columns.iter().map(ColumnDesc::from).collect();
1073        let order_types: Vec<OrderType> = table_desc
1074            .pk
1075            .iter()
1076            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
1077            .collect();
1078        let pk_indices = table_desc
1079            .pk
1080            .iter()
1081            .map(|col_order| col_order.column_index as usize)
1082            .collect_vec();
1083        let dist_key_in_pk_indices = table_desc
1084            .dist_key_in_pk_indices
1085            .iter()
1086            .map(|&idx| idx as usize)
1087            .collect();
1088        // StorageTableDesc provides vnode_col_idx_in_pk directly (already pk-relative),
1089        // unlike Table which has an absolute column index that needs conversion.
1090        let vnode_col_idx_in_pk = table_desc.vnode_col_idx_in_pk.map(|k| k as usize);
1091        let raw_value_indices = table_desc
1092            .value_indices
1093            .iter()
1094            .map(|val| *val as usize)
1095            .collect_vec();
1096
1097        Self {
1098            table_id,
1099            table_name_for_debug: table_id.to_string(),
1100            table_columns,
1101            order_types,
1102            pk_indices,
1103            dist_key_in_pk_indices,
1104            vnode_col_idx_in_pk,
1105            expected_vnode_count: table_desc.vnode_count(),
1106            value_indices: raw_value_indices,
1107            prefix_hint_len: table_desc.read_prefix_len_hint as usize,
1108            retention_seconds: table_desc.retention_seconds,
1109            versioned: table_desc.versioned,
1110            fragment_id: fragment_id.into(),
1111            clean_watermark_index: None,
1112            store,
1113            vnodes,
1114            op_consistency_level: None,
1115            output_column_ids: None,
1116            preload_all_rows: (),
1117            enable_vnode_key_stats: None,
1118            enable_state_table_vnode_stats_pruning: false,
1119            metrics: None,
1120            _serde: Default::default(),
1121        }
1122    }
1123}
1124
1125impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1126where
1127    S: StateStore,
1128    SD: ValueRowSerde,
1129{
1130    pub fn get_data_types(&self) -> &[DataType] {
1131        &self.data_types
1132    }
1133
1134    pub fn table_id(&self) -> TableId {
1135        self.table_id
1136    }
1137
1138    /// Get the vnode value with given (prefix of) primary key
1139    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
1140        self.distribution
1141            .try_compute_vnode_by_pk_prefix(pk_prefix)
1142            .expect("For streaming, the given prefix must be enough to calculate the vnode")
1143    }
1144
1145    /// Get the vnode value of the given primary key
1146    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
1147        self.distribution.compute_vnode_by_pk(pk)
1148    }
1149
1150    /// NOTE(kwannoel): This is used by backfill.
1151    /// We want to check pk indices of upstream table.
1152    pub fn pk_indices(&self) -> &[usize] {
1153        &self.pk_indices
1154    }
1155
1156    /// Get the indices of the primary key columns in the output columns.
1157    ///
1158    /// Returns `None` if any of the primary key columns is not in the output columns.
1159    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
1160        assert!(IS_REPLICATED);
1161        self.pk_indices
1162            .iter()
1163            .map(|&i| self.output_indices.iter().position(|&j| i == j))
1164            .collect()
1165    }
1166
1167    pub fn pk_serde(&self) -> &OrderedRowSerde {
1168        &self.pk_serde
1169    }
1170
1171    pub fn vnodes(&self) -> &Arc<Bitmap> {
1172        self.distribution.vnodes()
1173    }
1174
1175    pub fn value_indices(&self) -> &Option<Vec<usize>> {
1176        &self.value_indices
1177    }
1178
1179    pub fn is_consistent_op(&self) -> bool {
1180        matches!(
1181            self.op_consistency_level,
1182            StateTableOpConsistencyLevel::ConsistentOldValue
1183                | StateTableOpConsistencyLevel::LogStoreEnabled
1184        )
1185    }
1186
1187    pub fn metrics(&self) -> Option<&StateTableMetrics> {
1188        self.row_store.metrics.as_ref()
1189    }
1190}
1191
1192impl<S, SD> StateTableInner<S, SD, true>
1193where
1194    S: StateStore,
1195    SD: ValueRowSerde,
1196{
1197    /// Create replicated state table from table catalog with output indices
1198    pub async fn new_replicated(
1199        table_catalog: &Table,
1200        store: S,
1201        vnodes: Option<Arc<Bitmap>>,
1202        output_column_ids: Vec<ColumnId>,
1203    ) -> Self {
1204        // TODO: can it be ConsistentOldValue?
1205        // TODO: may enable preload_all_rows
1206        StateTableBuilder::new(table_catalog, store, vnodes)
1207            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
1208            .with_output_column_ids(output_column_ids)
1209            .forbid_preload_all_rows()
1210            .build()
1211            .await
1212    }
1213}
1214
1215// point get
1216impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1217where
1218    S: StateStore,
1219    SD: ValueRowSerde,
1220{
1221    /// Get a single row from state table.
1222    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
1223        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1224        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
1225        match row {
1226            Some(row) => {
1227                if IS_REPLICATED {
1228                    // If the table is replicated, we need to deserialize the row with the output
1229                    // indices.
1230                    let row = row.project(&self.output_indices);
1231                    Ok(Some(row.into_owned_row()))
1232                } else {
1233                    Ok(Some(row))
1234                }
1235            }
1236            None => Ok(None),
1237        }
1238    }
1239
1240    /// Get a raw encoded row from state table.
1241    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
1242        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
1243        self.row_store.exists(serialized_pk, prefix_hint).await
1244    }
1245
1246    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
1247        assert!(pk.len() <= self.pk_indices.len());
1248        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
1249    }
1250
1251    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
1252        let serialized_pk = self.serialize_pk(&pk);
1253        let prefix_hint = if should_calculate_prefix_hint(self.prefix_hint_len, pk.len(), false) {
1254            Some(serialized_pk.slice(VirtualNode::SIZE..))
1255        } else {
1256            #[cfg(debug_assertions)]
1257            if self.prefix_hint_len != 0 {
1258                warn!(
1259                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
1260                );
1261            }
1262            None
1263        };
1264        (serialized_pk, prefix_hint)
1265    }
1266}
1267
1268impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1269    async fn get(
1270        &self,
1271        key_bytes: TableKey<Bytes>,
1272        prefix_hint: Option<Bytes>,
1273    ) -> StreamExecutorResult<Option<OwnedRow>> {
1274        if let Some(m) = &self.metrics {
1275            m.get_count.inc();
1276        }
1277        if let Some(rows) = &self.all_rows {
1278            let (vnode, key) = key_bytes.split_vnode_bytes();
1279            return Ok(rows.get(&vnode).expect("covered vnode").get(&key).cloned());
1280        }
1281
1282        // Try to prune using vnode statistics
1283        let should_prune = if let Some(stats) = &self.vnode_stats
1284            && let (vnode, key) = key_bytes.split_vnode_bytes()
1285            && let Some(vnode_stat) = stats.get(&vnode)
1286            && vnode_stat.can_prune(&key)
1287        {
1288            if let Some(m) = &self.metrics {
1289                m.get_vnode_pruned_count.inc();
1290            }
1291            true
1292        } else {
1293            false
1294        };
1295
1296        if should_prune && self.enable_state_table_vnode_stats_pruning {
1297            return Ok(None);
1298        }
1299
1300        let read_options = ReadOptions {
1301            prefix_hint,
1302            cache_policy: CachePolicy::Fill(Hint::Normal),
1303            ..Default::default()
1304        };
1305
1306        let result = self
1307            .state_store
1308            .on_key_value(key_bytes, read_options, move |_, value| {
1309                let row = self.row_serde.deserialize(value)?;
1310                Ok(OwnedRow::new(row))
1311            })
1312            .await
1313            .map_err(Into::<StreamExecutorError>::into)?;
1314
1315        // In dry-run mode, verify that pruning would have been correct
1316        if should_prune && result.is_some() {
1317            tracing::warn!(
1318                table_id = %self.table_id,
1319                "vnode stats pruning dry run fails for get. This will not affect correctness."
1320            );
1321        }
1322
1323        Ok(result)
1324    }
1325
1326    async fn exists(
1327        &self,
1328        key_bytes: TableKey<Bytes>,
1329        prefix_hint: Option<Bytes>,
1330    ) -> StreamExecutorResult<bool> {
1331        if let Some(m) = &self.metrics {
1332            m.get_count.inc();
1333        }
1334        if let Some(rows) = &self.all_rows {
1335            let (vnode, key) = key_bytes.split_vnode_bytes();
1336            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(&key));
1337        }
1338
1339        // Try to prune using vnode statistics
1340        let should_prune = if let Some(stats) = &self.vnode_stats
1341            && let (vnode, key) = key_bytes.split_vnode_bytes()
1342            && let Some(vnode_stat) = stats.get(&vnode)
1343            && vnode_stat.can_prune(&key)
1344        {
1345            if let Some(m) = &self.metrics {
1346                m.get_vnode_pruned_count.inc();
1347            }
1348            true
1349        } else {
1350            false
1351        };
1352
1353        if should_prune && self.enable_state_table_vnode_stats_pruning {
1354            return Ok(false);
1355        }
1356
1357        let read_options = ReadOptions {
1358            prefix_hint,
1359            cache_policy: CachePolicy::Fill(Hint::Normal),
1360            ..Default::default()
1361        };
1362        let result = self
1363            .state_store
1364            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1365            .await?;
1366        let exists = result.is_some();
1367
1368        // In dry-run mode, verify that pruning would have been correct
1369        if should_prune && exists {
1370            tracing::warn!(
1371                table_id = %self.table_id,
1372                "vnode stats pruning dry run fails for exists. This will not affect correctness."
1373            );
1374        }
1375
1376        Ok(exists)
1377    }
1378}
1379
1380/// A callback struct returned from [`StateTableInner::commit`].
1381///
1382/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1383/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1384/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1385///
1386/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1387/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1388/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1389/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1390/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1391/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1392/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1393/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1394#[must_use]
1395pub struct StateTablePostCommit<'a, S, SD = BasicSerde, const IS_REPLICATED: bool = false>
1396where
1397    S: StateStore,
1398    SD: ValueRowSerde,
1399{
1400    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1401}
1402
1403impl<'a, S, SD, const IS_REPLICATED: bool> StateTablePostCommit<'a, S, SD, IS_REPLICATED>
1404where
1405    S: StateStore,
1406    SD: ValueRowSerde,
1407{
1408    /// Returns `Some((new_vnodes, old_vnodes, state_table), keyed_cache_may_stale)` if the vnode bitmap is updated.
1409    ///
1410    /// Note the `keyed_cache_may_stale` only applies to keyed cache. If the executor's cache is not keyed, but will
1411    /// be consumed with all vnodes it owns, the executor may need to ALWAYS clear the cache regardless of this flag.
1412    pub async fn post_yield_barrier(
1413        mut self,
1414        new_vnodes: Option<Arc<Bitmap>>,
1415    ) -> StreamExecutorResult<
1416        Option<(
1417            (
1418                Arc<Bitmap>,
1419                Arc<Bitmap>,
1420                &'a mut StateTableInner<S, SD, IS_REPLICATED>,
1421            ),
1422            bool,
1423        )>,
1424    > {
1425        self.inner.on_post_commit = false;
1426        Ok(if let Some(new_vnodes) = new_vnodes {
1427            let (old_vnodes, keyed_cache_may_stale) =
1428                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1429            Some(((new_vnodes, old_vnodes, self.inner), keyed_cache_may_stale))
1430        } else {
1431            None
1432        })
1433    }
1434
1435    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED> {
1436        &*self.inner
1437    }
1438
1439    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1440    async fn update_vnode_bitmap(
1441        &mut self,
1442        new_vnodes: Arc<Bitmap>,
1443    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1444        let prev_vnodes = self
1445            .inner
1446            .row_store
1447            .update_vnode_bitmap(new_vnodes.clone())
1448            .await?;
1449        assert_eq!(
1450            &prev_vnodes,
1451            self.inner.vnodes(),
1452            "state table and state store vnode bitmap mismatches"
1453        );
1454
1455        if self.inner.distribution.is_singleton() {
1456            assert_eq!(
1457                &new_vnodes,
1458                self.inner.vnodes(),
1459                "should not update vnode bitmap for singleton table"
1460            );
1461        }
1462        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1463
1464        let keyed_cache_may_stale = keyed_cache_may_stale(self.inner.vnodes(), &new_vnodes);
1465
1466        if keyed_cache_may_stale {
1467            self.inner.pending_watermark = None;
1468        }
1469
1470        Ok((
1471            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1472            keyed_cache_may_stale,
1473        ))
1474    }
1475}
1476
1477// write
1478impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1479    fn handle_mem_table_error(&self, e: StorageError) {
1480        let e = match e.into_inner() {
1481            ErrorKind::MemTable(e) => e,
1482            _ => unreachable!("should only get memtable error"),
1483        };
1484        match *e {
1485            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1486                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1487                panic!(
1488                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1489                    self.table_id,
1490                    vnode,
1491                    &key,
1492                    prev.debug_fmt(&*self.row_serde),
1493                    new.debug_fmt(&*self.row_serde),
1494                )
1495            }
1496        }
1497    }
1498
1499    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1500        insane_mode_discard_point!();
1501        let value_bytes = self.row_serde.serialize(&value).into();
1502
1503        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1504
1505        // Update vnode statistics (skip if all_rows is present)
1506        if self.all_rows.is_none()
1507            && let Some(stats) = &mut self.vnode_stats
1508            && let Some(vnode_stat) = stats.get_mut(&vnode)
1509        {
1510            vnode_stat.update_with_key(&key_without_vnode);
1511        }
1512
1513        if let Some(rows) = &mut self.all_rows {
1514            rows.get_mut(&vnode)
1515                .expect("covered vnode")
1516                .insert(key_without_vnode, value.into_owned_row());
1517        }
1518        self.state_store
1519            .insert(key, value_bytes, None)
1520            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1521    }
1522
1523    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1524        insane_mode_discard_point!();
1525        let value_bytes = self.row_serde.serialize(value).into();
1526
1527        let (vnode, key_without_vnode) = key.split_vnode_bytes();
1528
1529        if self.all_rows.is_none()
1530            && let Some(stats) = &mut self.vnode_stats
1531            && let Some(vnode_stat) = stats.get_mut(&vnode)
1532        {
1533            vnode_stat.update_with_key(&key_without_vnode);
1534        }
1535
1536        if let Some(rows) = &mut self.all_rows {
1537            rows.get_mut(&vnode)
1538                .expect("covered vnode")
1539                .remove(&key_without_vnode);
1540        }
1541        self.state_store
1542            .delete(key, value_bytes)
1543            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1544    }
1545
1546    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1547        insane_mode_discard_point!();
1548        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1549        let old_value_bytes = self.row_serde.serialize(old_value).into();
1550
1551        let (vnode, key_without_vnode) = key_bytes.split_vnode_bytes();
1552
1553        // Update does not change the key, so statistics remain valid (skip if all_rows is present)
1554        // But we update to ensure consistency
1555        if self.all_rows.is_none()
1556            && let Some(stats) = &mut self.vnode_stats
1557            && let Some(vnode_stat) = stats.get_mut(&vnode)
1558        {
1559            vnode_stat.update_with_key(&key_without_vnode);
1560        }
1561
1562        if let Some(rows) = &mut self.all_rows {
1563            rows.get_mut(&vnode)
1564                .expect("covered vnode")
1565                .insert(key_without_vnode, new_value.into_owned_row());
1566        }
1567        self.state_store
1568            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1569            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1570    }
1571}
1572
1573impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1574where
1575    S: StateStore,
1576    SD: ValueRowSerde,
1577{
1578    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1579    /// the table.
1580    pub fn insert(&mut self, value: impl Row) {
1581        let pk_indices = &self.pk_indices;
1582        let pk = (&value).project(pk_indices);
1583
1584        let key_bytes = self.serialize_pk(&pk);
1585        dispatch_value_indices!(&self.value_indices, [value], {
1586            self.row_store.insert(key_bytes, value)
1587        })
1588    }
1589
1590    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1591    /// column desc of the table.
1592    pub fn delete(&mut self, old_value: impl Row) {
1593        let pk_indices = &self.pk_indices;
1594        let pk = (&old_value).project(pk_indices);
1595
1596        let key_bytes = self.serialize_pk(&pk);
1597        dispatch_value_indices!(&self.value_indices, [old_value], {
1598            self.row_store.delete(key_bytes, old_value)
1599        })
1600    }
1601
1602    /// Update a row. The old and new value should have the same pk.
1603    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1604        let old_pk = (&old_value).project(self.pk_indices());
1605        let new_pk = (&new_value).project(self.pk_indices());
1606        debug_assert!(
1607            Row::eq(&old_pk, new_pk),
1608            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1609            self.table_id
1610        );
1611
1612        let key_bytes = self.serialize_pk(&new_pk);
1613        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1614            self.row_store.update(key_bytes, old_value, new_value)
1615        })
1616    }
1617
1618    /// Write a record into state table. Must have the same schema with the table.
1619    pub fn write_record(&mut self, record: Record<impl Row>) {
1620        match record {
1621            Record::Insert { new_row } => self.insert(new_row),
1622            Record::Delete { old_row } => self.delete(old_row),
1623            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1624        }
1625    }
1626
1627    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1628        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1629    }
1630
1631    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1632    // allow(izip, which use zip instead of zip_eq)
1633    #[allow(clippy::disallowed_methods)]
1634    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1635        let chunk = if IS_REPLICATED {
1636            self.fill_non_output_indices(chunk)
1637        } else {
1638            chunk
1639        };
1640
1641        let vnodes = self
1642            .distribution
1643            .compute_chunk_vnode(&chunk, &self.pk_indices);
1644
1645        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1646            let Some((op, row)) = optional_row else {
1647                continue;
1648            };
1649            let pk = row.project(&self.pk_indices);
1650            let vnode = vnodes[idx];
1651            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1652            match op {
1653                Op::Insert | Op::UpdateInsert => {
1654                    dispatch_value_indices!(&self.value_indices, [row], {
1655                        self.row_store.insert(key_bytes, row);
1656                    });
1657                }
1658                Op::Delete | Op::UpdateDelete => {
1659                    dispatch_value_indices!(&self.value_indices, [row], {
1660                        self.row_store.delete(key_bytes, row);
1661                    });
1662                }
1663            }
1664        }
1665    }
1666
1667    /// Update watermark for state cleaning.
1668    ///
1669    /// # Arguments
1670    ///
1671    /// * `watermark` - Latest watermark received.
1672    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1673        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1674        self.pending_watermark = Some(watermark);
1675    }
1676
1677    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1678    /// table through `update_watermark` method.
1679    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1680        self.committed_watermark.as_ref()
1681    }
1682
1683    pub async fn commit(
1684        &mut self,
1685        new_epoch: EpochPair,
1686    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1687        self.commit_inner(new_epoch, None).await
1688    }
1689
1690    #[cfg(test)]
1691    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1692        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1693    }
1694
1695    pub async fn commit_assert_no_update_vnode_bitmap(
1696        &mut self,
1697        new_epoch: EpochPair,
1698    ) -> StreamExecutorResult<()> {
1699        let post_commit = self.commit_inner(new_epoch, None).await?;
1700        post_commit.post_yield_barrier(None).await?;
1701        Ok(())
1702    }
1703
1704    pub async fn commit_may_switch_consistent_op(
1705        &mut self,
1706        new_epoch: EpochPair,
1707        op_consistency_level: StateTableOpConsistencyLevel,
1708    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1709        if self.op_consistency_level != op_consistency_level {
1710            // avoid flooding e2e-test log
1711            if !cfg!(debug_assertions) {
1712                info!(
1713                    ?new_epoch,
1714                    prev_op_consistency_level = ?self.op_consistency_level,
1715                    ?op_consistency_level,
1716                    table_id = %self.table_id,
1717                    "switch to new op consistency level"
1718                );
1719            }
1720            self.commit_inner(new_epoch, Some(op_consistency_level))
1721                .await
1722        } else {
1723            self.commit_inner(new_epoch, None).await
1724        }
1725    }
1726
1727    async fn commit_inner(
1728        &mut self,
1729        new_epoch: EpochPair,
1730        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1731    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED>> {
1732        assert!(!self.on_post_commit);
1733        assert_eq!(
1734            self.epoch.expect("should only be called after init").curr,
1735            new_epoch.prev
1736        );
1737        if let Some(new_consistency_level) = switch_consistent_op {
1738            assert_ne!(self.op_consistency_level, new_consistency_level);
1739            self.op_consistency_level = new_consistency_level;
1740        }
1741        trace!(
1742            table_id = %self.table_id,
1743            epoch = ?self.epoch,
1744            "commit state table"
1745        );
1746
1747        let table_watermarks = self.commit_pending_watermark();
1748        self.row_store
1749            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1750            .await?;
1751        self.epoch = Some(new_epoch);
1752
1753        self.on_post_commit = true;
1754        Ok(StateTablePostCommit { inner: self })
1755    }
1756
1757    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1758    fn commit_pending_watermark(
1759        &mut self,
1760    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1761        let watermark = self.pending_watermark.take()?;
1762        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1763
1764        assert!(
1765            !self.pk_indices().is_empty(),
1766            "see pending watermark on empty pk"
1767        );
1768        let (watermark_serializer, watermark_type) = self
1769            .watermark_serde
1770            .as_ref()
1771            .expect("watermark serde should be initialized to commit watermark");
1772        let watermark_suffix =
1773            serialize_row(row::once(Some(watermark.clone())), watermark_serializer);
1774        let vnode_watermark = VnodeWatermark::new(
1775            self.vnodes().clone(),
1776            Bytes::copy_from_slice(watermark_suffix.as_ref()),
1777        );
1778        trace!(table_id = %self.table_id, ?vnode_watermark, "table watermark");
1779
1780        let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1781        let direction = if order_type.is_ascending() {
1782            WatermarkDirection::Ascending
1783        } else {
1784            WatermarkDirection::Descending
1785        };
1786
1787        self.committed_watermark = Some(watermark);
1788        Some((direction, vec![vnode_watermark], *watermark_type))
1789    }
1790
1791    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1792        self.row_store.try_flush().await?;
1793        Ok(())
1794    }
1795}
1796
1797// Manually expand trait alias for better IDE experience.
1798pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1799impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1800
1801pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1802impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1803
1804pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1805impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1806
1807pub type BoxedRowStream<'a> = BoxStream<'a, StreamExecutorResult<OwnedRow>>;
1808
1809pub trait FromVnodeBytes {
1810    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1811}
1812
1813impl FromVnodeBytes for Bytes {
1814    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1815        prefix_slice_with_vnode(vnode, bytes)
1816    }
1817}
1818
1819impl FromVnodeBytes for () {
1820    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1821}
1822
1823// Iterator functions
1824impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
1825where
1826    S: StateStore,
1827    SD: ValueRowSerde,
1828{
1829    /// This function scans rows from the relational table with specific `pk_range` under the same
1830    /// `vnode`.
1831    pub async fn iter_with_vnode(
1832        &self,
1833
1834        // Optional vnode that returns an iterator only over the given range under that vnode.
1835        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1836        // iterate over each vnode that the `StateTableInner` owns.
1837        vnode: VirtualNode,
1838        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1839        prefetch_options: PrefetchOptions,
1840    ) -> StreamExecutorResult<impl RowStream<'_>> {
1841        Ok(self
1842            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1843            .await?
1844            .map_ok(|(_, row)| row))
1845    }
1846
1847    pub async fn iter_keyed_row_with_vnode(
1848        &self,
1849        vnode: VirtualNode,
1850        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1851        prefetch_options: PrefetchOptions,
1852    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1853        Ok(self
1854            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1855            .await?
1856            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1857    }
1858
1859    pub async fn iter_with_vnode_and_output_indices(
1860        &self,
1861        vnode: VirtualNode,
1862        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1863        prefetch_options: PrefetchOptions,
1864    ) -> StreamExecutorResult<impl RowStream<'_>> {
1865        assert!(IS_REPLICATED);
1866        let stream = self
1867            .iter_with_vnode(vnode, pk_range, prefetch_options)
1868            .await?;
1869        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1870    }
1871}
1872
1873impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1874    // The lowest-level API.
1875    /// Middle-level APIs:
1876    /// - [`StateTableInner::iter_with_prefix_inner`]
1877    /// - [`StateTableInner::iter_kv_with_pk_range`]
1878    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1879        &self,
1880        vnode: VirtualNode,
1881        (start, end): (Bound<Bytes>, Bound<Bytes>),
1882        prefix_hint: Option<Bytes>,
1883        prefetch_options: PrefetchOptions,
1884    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1885        if let Some(m) = &self.metrics {
1886            m.iter_count.inc();
1887        }
1888        // Check if we can prune the entire range using vnode statistics
1889        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1890            &self.vnode_stats
1891            && let Some(vnode_stat) = stats.get(&vnode)
1892        {
1893            match vnode_stat.pruned_key_range(&start, &end) {
1894                Some((new_start, new_end)) => {
1895                    if self.enable_state_table_vnode_stats_pruning {
1896                        (new_start, new_end, false)
1897                    } else {
1898                        // In dry-run mode, we don't apply pruning but verify correctness
1899                        (start, end, false)
1900                    }
1901                }
1902                None => {
1903                    if let Some(m) = &self.metrics {
1904                        m.iter_vnode_pruned_count.inc();
1905                    }
1906                    // Mark that we should prune entirely, but handle dry-run below
1907                    (start.clone(), end.clone(), true)
1908                }
1909            }
1910        } else {
1911            (start, end, false)
1912        };
1913
1914        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
1915            return Ok(futures::future::Either::Left(futures::stream::empty()));
1916        }
1917
1918        let table_id = self.table_id;
1919        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
1920            // Only log when in dry-run mode and we would have pruned but got results
1921            if should_prune_entirely && result.is_ok() {
1922                tracing::warn!(
1923                    table_id = %table_id,
1924                    "vnode stats pruning dry run fails for iter. This will not affect correctness."
1925                );
1926            }
1927        };
1928
1929        if let Some(rows) = &self.all_rows {
1930            return Ok(futures::future::Either::Right(
1931                futures::future::Either::Left(
1932                    futures::stream::iter(
1933                        rows.get(&vnode)
1934                            .expect("covered vnode")
1935                            .range((pruned_start, pruned_end))
1936                            .map(move |(key, value)| {
1937                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
1938                            }),
1939                    )
1940                    .inspect(inspect_fn),
1941                ),
1942            ));
1943        }
1944        let read_options = ReadOptions {
1945            prefix_hint,
1946            prefetch_options,
1947            cache_policy: CachePolicy::Fill(Hint::Normal),
1948        };
1949
1950        Ok(futures::future::Either::Right(
1951            futures::future::Either::Right(
1952                deserialize_keyed_row_stream(
1953                    self.state_store
1954                        .iter(
1955                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
1956                            read_options,
1957                        )
1958                        .await?,
1959                    &*self.row_serde,
1960                )
1961                .inspect(inspect_fn),
1962            ),
1963        ))
1964    }
1965
1966    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1967        &self,
1968        vnode: VirtualNode,
1969        (start, end): (Bound<Bytes>, Bound<Bytes>),
1970        prefix_hint: Option<Bytes>,
1971        prefetch_options: PrefetchOptions,
1972    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1973        if let Some(m) = &self.metrics {
1974            m.iter_count.inc();
1975        }
1976        // Check if we can prune the entire range using vnode statistics
1977        let (pruned_start, pruned_end, should_prune_entirely) = if let Some(stats) =
1978            &self.vnode_stats
1979            && let Some(vnode_stat) = stats.get(&vnode)
1980        {
1981            match vnode_stat.pruned_key_range(&start, &end) {
1982                Some((new_start, new_end)) => {
1983                    if self.enable_state_table_vnode_stats_pruning {
1984                        (new_start, new_end, false)
1985                    } else {
1986                        // In dry-run mode, we don't apply pruning but verify correctness
1987                        (start, end, false)
1988                    }
1989                }
1990                None => {
1991                    if let Some(m) = &self.metrics {
1992                        m.iter_vnode_pruned_count.inc();
1993                    }
1994                    // Mark that we should prune entirely, but handle dry-run below
1995                    (start, end, true)
1996                }
1997            }
1998        } else {
1999            (start, end, false)
2000        };
2001
2002        if should_prune_entirely && self.enable_state_table_vnode_stats_pruning {
2003            return Ok(futures::future::Either::Left(futures::stream::empty()));
2004        }
2005
2006        let table_id = self.table_id;
2007        let inspect_fn = move |result: &StreamExecutorResult<(K, OwnedRow)>| {
2008            // Only log when in dry-run mode and we would have pruned but got results
2009            if should_prune_entirely && result.is_ok() {
2010                tracing::warn!(
2011                    table_id = %table_id,
2012                    "vnode stats pruning dry run fails for rev_iter. This will not affect correctness."
2013                );
2014            }
2015        };
2016
2017        if let Some(rows) = &self.all_rows {
2018            return Ok(futures::future::Either::Right(
2019                futures::future::Either::Left(
2020                    futures::stream::iter(
2021                        rows.get(&vnode)
2022                            .expect("covered vnode")
2023                            .range((pruned_start, pruned_end))
2024                            .rev()
2025                            .map(move |(key, value)| {
2026                                Ok((K::from_vnode_bytes(vnode, key), value.clone()))
2027                            }),
2028                    )
2029                    .inspect(inspect_fn),
2030                ),
2031            ));
2032        }
2033        let read_options = ReadOptions {
2034            prefix_hint,
2035            prefetch_options,
2036            cache_policy: CachePolicy::Fill(Hint::Normal),
2037        };
2038
2039        Ok(futures::future::Either::Right(
2040            futures::future::Either::Right(
2041                deserialize_keyed_row_stream(
2042                    self.state_store
2043                        .rev_iter(
2044                            prefixed_range_with_vnode((pruned_start, pruned_end), vnode),
2045                            read_options,
2046                        )
2047                        .await?,
2048                    &*self.row_serde,
2049                )
2050                .inspect(inspect_fn),
2051            ),
2052        ))
2053    }
2054}
2055
2056impl<S, SD, const IS_REPLICATED: bool> StateTableInner<S, SD, IS_REPLICATED>
2057where
2058    S: StateStore,
2059    SD: ValueRowSerde,
2060{
2061    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
2062    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
2063    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
2064    pub async fn iter_with_prefix(
2065        &self,
2066        pk_prefix: impl Row,
2067        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2068        prefetch_options: PrefetchOptions,
2069    ) -> StreamExecutorResult<impl RowStream<'_>> {
2070        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
2071            .await?;
2072        Ok(stream.map_ok(|(_, row)| row))
2073    }
2074
2075    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
2076    /// `vnode`, and filters out rows based on watermarks. It calls `iter_with_prefix` and further filters rows
2077    /// based on the table watermark retrieved from the state store.
2078    ///
2079    /// The caller must ensure that `clean_watermark_index` is set before calling this method, otherwise it will return all rows without filtering.
2080    pub async fn iter_with_prefix_respecting_watermark(
2081        &self,
2082        pk_prefix: impl Row,
2083        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2084        prefetch_options: PrefetchOptions,
2085    ) -> StreamExecutorResult<BoxedRowStream<'_>> {
2086        let vnode = self.compute_prefix_vnode(&pk_prefix);
2087        let Some(clean_watermark_index) = self.clean_watermark_index else {
2088            return self
2089                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2090                .await
2091                .map(|s| s.boxed());
2092        };
2093        let Some((watermark_serde, watermark_type)) = &self.watermark_serde else {
2094            return Err(StreamExecutorError::from(anyhow!(
2095                "Missing watermark serde"
2096            )));
2097        };
2098        // Fast path. TableWatermarksIndex::rewrite_range_with_table_watermark has already filtered the rows.
2099        if matches!(watermark_type, WatermarkSerdeType::PkPrefix) {
2100            return self
2101                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2102                .await
2103                .map(|s| s.boxed());
2104        }
2105
2106        let watermark_bytes = self.row_store.state_store.get_table_watermark(vnode);
2107        let Some(watermark_bytes) = watermark_bytes else {
2108            return self
2109                .iter_with_prefix(pk_prefix, sub_range, prefetch_options)
2110                .await
2111                .map(|s| s.boxed());
2112        };
2113        let watermark_row = watermark_serde.deserialize(&watermark_bytes)?;
2114        if watermark_row.len() != 1 {
2115            return Err(StreamExecutorError::from(format!(
2116                "Watermark row should have exactly 1 column, got {}",
2117                watermark_row.len()
2118            )));
2119        }
2120        let watermark_value = watermark_row[0].clone();
2121        // StateTableInner::update_watermark should ensure that the watermark is not NULL
2122        if watermark_value.is_none() {
2123            return Err(StreamExecutorError::from(anyhow!(
2124                "Watermark cannot be NULL"
2125            )));
2126        }
2127        let order_type = watermark_serde.get_order_types().get(0).ok_or_else(|| {
2128            StreamExecutorError::from(anyhow!(
2129                "Watermark serde should have at least one order type"
2130            ))
2131        })?;
2132
2133        let direction = if order_type.is_ascending() {
2134            WatermarkDirection::Ascending
2135        } else {
2136            WatermarkDirection::Descending
2137        };
2138        let clean_watermark_index_in_pk = self
2139            .pk_indices
2140            .iter()
2141            .position(|&i| i == clean_watermark_index);
2142        let clean_watermark_index_in_value = match &self.value_indices {
2143            Some(value_indices) => value_indices
2144                .iter()
2145                .position(|idx| *idx == clean_watermark_index)
2146                .ok_or_else(|| {
2147                    StreamExecutorError::from(anyhow!(
2148                        "clean watermark column index {} is not included in table value indices {:?}",
2149                        clean_watermark_index,
2150                        value_indices
2151                    ))
2152                })?,
2153            None => clean_watermark_index,
2154        };
2155
2156        let stream = self
2157            .iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
2158            .await?
2159            .try_filter_map(move |(pk, row)| {
2160                let should_filter =  match watermark_type {
2161                    WatermarkSerdeType::PkPrefix => unreachable!(),
2162                    WatermarkSerdeType::NonPkPrefix => {
2163                        let table_key = TableKey(pk);
2164                        let (vnode, key) = table_key.split_vnode();
2165                        let pk_cols = self.pk_serde
2166                        .deserialize(key)
2167                        .unwrap_or_else(|e| {
2168                            panic!("Failed to deserialize table {} vnode {:?} key {:?} error: {:?}", self.table_id(), vnode, key, e.as_report());
2169                        });
2170                        direction.datum_filter_by_watermark(
2171                            pk_cols.datum_at(clean_watermark_index_in_pk.unwrap()),
2172                            &watermark_value,
2173                            *order_type,
2174                        )
2175                    },
2176                    WatermarkSerdeType::Value => {
2177                        direction.datum_filter_by_watermark(
2178                            row.datum_at(clean_watermark_index_in_value),
2179                            &watermark_value,
2180                            *order_type,
2181                        )
2182                    }
2183                };
2184                if should_filter {
2185                    ready(Ok(None))
2186                } else {
2187                    ready(Ok(Some(row)))
2188                }
2189            });
2190        Ok(stream.boxed())
2191    }
2192
2193    /// Get the row from a state table with only 1 row.
2194    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
2195        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
2196        let stream = self
2197            .iter_with_prefix(row::empty(), sub_range, Default::default())
2198            .await?;
2199        pin_mut!(stream);
2200
2201        if let Some(res) = stream.next().await {
2202            let value = res?.into_owned_row();
2203            assert!(stream.next().await.is_none());
2204            Ok(Some(value))
2205        } else {
2206            Ok(None)
2207        }
2208    }
2209
2210    /// Get the row from a state table with only 1 row, and the row has only 1 col.
2211    ///
2212    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
2213    /// which does not matter in the use case.
2214    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
2215        Ok(self
2216            .get_from_one_row_table()
2217            .await?
2218            .and_then(|row| row[0].clone()))
2219    }
2220
2221    pub async fn iter_keyed_row_with_prefix(
2222        &self,
2223        pk_prefix: impl Row,
2224        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2225        prefetch_options: PrefetchOptions,
2226    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2227        Ok(
2228            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
2229                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2230        )
2231    }
2232
2233    pub async fn rev_iter_keyed_row_with_prefix(
2234        &self,
2235        pk_prefix: impl Row,
2236        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2237        prefetch_options: PrefetchOptions,
2238    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
2239        Ok(
2240            self.iter_with_prefix_inner::</* REVERSE */ true, Bytes>(pk_prefix, sub_range, prefetch_options)
2241            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
2242        )
2243    }
2244
2245    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
2246    pub async fn rev_iter_with_prefix(
2247        &self,
2248        pk_prefix: impl Row,
2249        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2250        prefetch_options: PrefetchOptions,
2251    ) -> StreamExecutorResult<impl RowStream<'_>> {
2252        Ok(
2253            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
2254                .await?.map_ok(|(_, row)| row),
2255        )
2256    }
2257
2258    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
2259        &self,
2260        pk_prefix: impl Row,
2261        sub_range: &(Bound<impl Row>, Bound<impl Row>),
2262        prefetch_options: PrefetchOptions,
2263    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
2264        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
2265        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2266
2267        // We assume that all usages of iterating the state table only access a single vnode.
2268        // If this assertion fails, then something must be wrong with the operator implementation or
2269        // the distribution derivation from the optimizer.
2270        let vnode = self.compute_prefix_vnode(&pk_prefix);
2271
2272        // Construct prefix hint for prefix bloom filter.
2273        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
2274        if self.prefix_hint_len != 0 && !IS_REPLICATED {
2275            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
2276        }
2277        let prefix_hint = {
2278            if should_calculate_prefix_hint(self.prefix_hint_len, pk_prefix.len(), true) {
2279                let encoded_prefix_len = self
2280                    .pk_serde
2281                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
2282
2283                Some(Bytes::copy_from_slice(
2284                    &encoded_prefix[..encoded_prefix_len],
2285                ))
2286            } else {
2287                None
2288            }
2289        };
2290
2291        trace!(
2292            table_id = %self.table_id(),
2293            ?prefix_hint, ?pk_prefix,
2294            ?pk_prefix_indices,
2295            iter_direction = if REVERSE { "reverse" } else { "forward" },
2296            "storage_iter_with_prefix"
2297        );
2298
2299        let memcomparable_range =
2300            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
2301
2302        Ok(if REVERSE {
2303            futures::future::Either::Left(
2304                self.row_store
2305                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2306                    .await?,
2307            )
2308        } else {
2309            futures::future::Either::Right(
2310                self.row_store
2311                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
2312                    .await?,
2313            )
2314        })
2315    }
2316
2317    /// This function scans raw key-values from the relational table with specific `pk_range` under
2318    /// the same `vnode`.
2319    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
2320        &'a self,
2321        pk_range: &(Bound<impl Row>, Bound<impl Row>),
2322        // Optional vnode that returns an iterator only over the given range under that vnode.
2323        // For now, we require this parameter, and will panic. In the future, when `None`, we can
2324        // iterate over each vnode that the `StateTableInner` owns.
2325        vnode: VirtualNode,
2326        prefetch_options: PrefetchOptions,
2327    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
2328        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
2329
2330        // TODO: provide a trace of useful params.
2331        self.row_store
2332            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
2333            .await
2334    }
2335}
2336
2337fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
2338    iter: impl StateStoreIter + 'a,
2339    deserializer: &'a impl ValueRowSerde,
2340) -> impl PkRowStream<'a, K> {
2341    iter.into_stream(move |(key, value)| {
2342        Ok((
2343            K::copy_from_slice(key.user_key.table_key.as_ref()),
2344            deserializer.deserialize(value).map(OwnedRow::new)?,
2345        ))
2346    })
2347    .map_err(Into::into)
2348}
2349
2350pub fn prefix_range_to_memcomparable(
2351    pk_serde: &OrderedRowSerde,
2352    range: &(Bound<impl Row>, Bound<impl Row>),
2353) -> (Bound<Bytes>, Bound<Bytes>) {
2354    (
2355        start_range_to_memcomparable(pk_serde, &range.0),
2356        end_range_to_memcomparable(pk_serde, &range.1, None),
2357    )
2358}
2359
2360fn prefix_and_sub_range_to_memcomparable(
2361    pk_serde: &OrderedRowSerde,
2362    sub_range: &(Bound<impl Row>, Bound<impl Row>),
2363    pk_prefix: impl Row,
2364) -> (Bound<Bytes>, Bound<Bytes>) {
2365    let (range_start, range_end) = sub_range;
2366    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2367    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
2368    let start_range = match range_start {
2369        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
2370        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
2371        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
2372    };
2373    let end_range = match range_end {
2374        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
2375        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
2376        Unbounded => Unbounded,
2377    };
2378    (
2379        start_range_to_memcomparable(pk_serde, &start_range),
2380        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
2381    )
2382}
2383
2384fn start_range_to_memcomparable<R: Row>(
2385    pk_serde: &OrderedRowSerde,
2386    bound: &Bound<R>,
2387) -> Bound<Bytes> {
2388    let serialize_pk_prefix = |pk_prefix: &R| {
2389        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2390        serialize_pk(pk_prefix, &prefix_serializer)
2391    };
2392    match bound {
2393        Unbounded => Unbounded,
2394        Included(r) => {
2395            let serialized = serialize_pk_prefix(r);
2396
2397            Included(serialized)
2398        }
2399        Excluded(r) => {
2400            let serialized = serialize_pk_prefix(r);
2401
2402            start_bound_of_excluded_prefix(&serialized)
2403        }
2404    }
2405}
2406
2407fn end_range_to_memcomparable<R: Row>(
2408    pk_serde: &OrderedRowSerde,
2409    bound: &Bound<R>,
2410    serialized_pk_prefix: Option<Bytes>,
2411) -> Bound<Bytes> {
2412    let serialize_pk_prefix = |pk_prefix: &R| {
2413        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
2414        serialize_pk(pk_prefix, &prefix_serializer)
2415    };
2416    match bound {
2417        Unbounded => match serialized_pk_prefix {
2418            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
2419            None => Unbounded,
2420        },
2421        Included(r) => {
2422            let serialized = serialize_pk_prefix(r);
2423            // TODO: may use Included(serialized)?
2424            end_bound_of_prefix(&serialized)
2425        }
2426        Excluded(r) => {
2427            let serialized = serialize_pk_prefix(r);
2428            Excluded(serialized)
2429        }
2430    }
2431}
2432
2433fn fill_non_output_indices(
2434    i2o_mapping: &ColIndexMapping,
2435    data_types: &[DataType],
2436    chunk: StreamChunk,
2437) -> StreamChunk {
2438    let cardinality = chunk.cardinality();
2439    let (ops, columns, vis) = chunk.into_inner();
2440    let mut full_columns = Vec::with_capacity(data_types.len());
2441    for (i, data_type) in data_types.iter().enumerate() {
2442        if let Some(j) = i2o_mapping.try_map(i) {
2443            full_columns.push(columns[j].clone());
2444        } else {
2445            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
2446            column_builder.append_n_null(cardinality);
2447            let column: ArrayRef = column_builder.finish().into();
2448            full_columns.push(column)
2449        }
2450    }
2451    let data_chunk = DataChunk::new(full_columns, vis);
2452    StreamChunk::from_parts(ops, data_chunk)
2453}
2454
2455#[cfg(test)]
2456mod tests {
2457    use std::fmt::Debug;
2458
2459    use expect_test::{Expect, expect};
2460
2461    use super::*;
2462
2463    fn check(actual: impl Debug, expect: Expect) {
2464        let actual = format!("{:#?}", actual);
2465        expect.assert_eq(&actual);
2466    }
2467
2468    #[test]
2469    fn test_fill_non_output_indices() {
2470        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
2471        let replicated_chunk = [OwnedRow::new(vec![
2472            Some(222_i32.into()),
2473            Some(2_i32.into()),
2474        ])];
2475        let replicated_chunk = StreamChunk::from_parts(
2476            vec![Op::Insert],
2477            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
2478        );
2479        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
2480        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
2481        check(
2482            filled_chunk,
2483            expect![[r#"
2484            StreamChunk { cardinality: 1, capacity: 1, data:
2485            +---+---+---+-----+
2486            | + | 2 |   | 222 |
2487            +---+---+---+-----+
2488             }"#]],
2489        );
2490    }
2491}