risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::CacheHint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43    CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44    start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62    ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72/// Mostly watermark operators will have inserts (append-only).
73/// So this number should not need to be very large.
74/// But we may want to improve this choice in the future.
75const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77/// This macro is used to mark a point where we want to randomly discard the operation and early
78/// return, only in insane mode.
79macro_rules! insane_mode_discard_point {
80    () => {{
81        use rand::Rng;
82        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83            return;
84        }
85    }};
86}
87
88/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
89/// row-based encoding.
90#[derive(Clone)]
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    local_store: S::Local,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Used for serializing and deserializing the primary key.
110    pk_serde: OrderedRowSerde,
111
112    /// Row deserializer with value encoding
113    row_serde: SD,
114
115    /// Indices of primary key.
116    /// Note that the index is based on the all columns of the table, instead of the output ones.
117    // FIXME: revisit constructions and usages.
118    pk_indices: Vec<usize>,
119
120    /// Distribution of the state table.
121    ///
122    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
123    /// executor. The table will also check whether the written rows
124    /// conform to this partition.
125    distribution: TableDistribution,
126
127    prefix_hint_len: usize,
128
129    /// Used for catalog `table_properties`
130    table_option: TableOption,
131
132    value_indices: Option<Vec<usize>>,
133
134    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
135    pending_watermark: Option<ScalarImpl>,
136    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
137    committed_watermark: Option<ScalarImpl>,
138    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
139    watermark_cache: StateTableWatermarkCache,
140
141    /// Data Types
142    /// We will need to use to build data chunks from state table rows.
143    data_types: Vec<DataType>,
144
145    /// "i" here refers to the base `state_table`'s actual schema.
146    /// "o" here refers to the replicated state table's output schema.
147    /// This mapping is used to reconstruct a row being written from replicated state table.
148    /// Such that the schema of this row will match the full schema of the base state table.
149    /// It is only applicable for replication.
150    i2o_mapping: ColIndexMapping,
151
152    /// Output indices
153    /// Used for:
154    /// 1. Computing `output_value_indices` to ser/de replicated rows.
155    /// 2. Computing output pk indices to used them for backfill state.
156    output_indices: Vec<usize>,
157
158    op_consistency_level: StateTableOpConsistencyLevel,
159
160    clean_watermark_index_in_pk: Option<i32>,
161
162    /// Flag to indicate whether the state table has called `commit`, but has not called
163    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
164    on_post_commit: bool,
165}
166
167/// `StateTable` will use `BasicSerde` as default
168pub type StateTable<S> = StateTableInner<S, BasicSerde>;
169/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
170/// Used for `ArrangementBackfill` executor.
171pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
172/// `WatermarkCacheStateTable` caches the watermark column.
173/// It will reduce state cleaning overhead.
174pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
175pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
176    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
177
178// initialize
179impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
180    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
181where
182    S: StateStore,
183    SD: ValueRowSerde,
184{
185    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
186    /// and otherwise, deadlock can be likely to happen.
187    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
188        self.local_store.init(InitOptions::new(epoch)).await?;
189        Ok(())
190    }
191
192    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193        self.store
194            .try_wait_epoch(
195                HummockReadEpoch::Committed(prev_epoch),
196                TryWaitEpochOptions {
197                    table_id: self.table_id,
198                },
199            )
200            .await
201    }
202
203    pub fn state_store(&self) -> &S {
204        &self.store
205    }
206}
207
208fn consistent_old_value_op(
209    row_serde: impl ValueRowSerde,
210    is_log_store: bool,
211) -> OpConsistencyLevel {
212    OpConsistencyLevel::ConsistentOldValue {
213        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214            if first == second {
215                return true;
216            }
217            let first = match row_serde.deserialize(first) {
218                Ok(rows) => rows,
219                Err(e) => {
220                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221                    return false;
222                }
223            };
224            let second = match row_serde.deserialize(second) {
225                Ok(rows) => rows,
226                Err(e) => {
227                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228                    return false;
229                }
230            };
231            if first != second {
232                error!(first = ?first, second = ?second, "sanity check fail");
233                false
234            } else {
235                true
236            }
237        }),
238        is_log_store,
239    }
240}
241
242#[derive(Eq, PartialEq, Copy, Clone, Debug)]
243pub enum StateTableOpConsistencyLevel {
244    /// Op is inconsistent
245    Inconsistent,
246    /// Op is consistent.
247    /// - Insert op should ensure that the key does not exist previously
248    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
249    ConsistentOldValue,
250    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
251    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
252    LogStoreEnabled,
253}
254
255// initialize
256// FIXME(kwannoel): Enforce that none of the constructors here
257// should be used by replicated state table.
258// Apart from from_table_catalog_inner.
259impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
260    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
261where
262    S: StateStore,
263    SD: ValueRowSerde,
264{
265    /// Create state table from table catalog and store.
266    ///
267    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
268    pub async fn from_table_catalog(
269        table_catalog: &Table,
270        store: S,
271        vnodes: Option<Arc<Bitmap>>,
272    ) -> Self {
273        Self::from_table_catalog_with_consistency_level(
274            table_catalog,
275            store,
276            vnodes,
277            StateTableOpConsistencyLevel::ConsistentOldValue,
278        )
279        .await
280    }
281
282    /// Create state table from table catalog and store with sanity check disabled.
283    pub async fn from_table_catalog_inconsistent_op(
284        table_catalog: &Table,
285        store: S,
286        vnodes: Option<Arc<Bitmap>>,
287    ) -> Self {
288        Self::from_table_catalog_with_consistency_level(
289            table_catalog,
290            store,
291            vnodes,
292            StateTableOpConsistencyLevel::Inconsistent,
293        )
294        .await
295    }
296
297    pub async fn from_table_catalog_with_consistency_level(
298        table_catalog: &Table,
299        store: S,
300        vnodes: Option<Arc<Bitmap>>,
301        consistency_level: StateTableOpConsistencyLevel,
302    ) -> Self {
303        Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
304            .await
305    }
306
307    /// Create state table from table catalog and store.
308    async fn from_table_catalog_inner(
309        table_catalog: &Table,
310        store: S,
311        vnodes: Option<Arc<Bitmap>>,
312        op_consistency_level: StateTableOpConsistencyLevel,
313        output_column_ids: Vec<ColumnId>,
314    ) -> Self {
315        let table_id = TableId::new(table_catalog.id);
316        let table_columns: Vec<ColumnDesc> = table_catalog
317            .columns
318            .iter()
319            .map(|col| col.column_desc.as_ref().unwrap().into())
320            .collect();
321        let data_types: Vec<DataType> = table_catalog
322            .columns
323            .iter()
324            .map(|col| {
325                col.get_column_desc()
326                    .unwrap()
327                    .get_column_type()
328                    .unwrap()
329                    .into()
330            })
331            .collect();
332        let order_types: Vec<OrderType> = table_catalog
333            .pk
334            .iter()
335            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
336            .collect();
337        let dist_key_indices: Vec<usize> = table_catalog
338            .distribution_key
339            .iter()
340            .map(|dist_index| *dist_index as usize)
341            .collect();
342
343        let pk_indices = table_catalog
344            .pk
345            .iter()
346            .map(|col_order| col_order.column_index as usize)
347            .collect_vec();
348
349        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
350        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
351            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
352        } else {
353            table_catalog
354                .get_dist_key_in_pk()
355                .iter()
356                .map(|idx| *idx as usize)
357                .collect()
358        };
359
360        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
361            let vnode_col_idx = *idx as usize;
362            pk_indices.iter().position(|&i| vnode_col_idx == i)
363        });
364
365        let distribution =
366            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
367        assert_eq!(
368            distribution.vnode_count(),
369            table_catalog.vnode_count(),
370            "vnode count mismatch, scanning table {} under wrong distribution?",
371            table_catalog.name,
372        );
373
374        let pk_data_types = pk_indices
375            .iter()
376            .map(|i| table_columns[*i].data_type.clone())
377            .collect();
378        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
379
380        let input_value_indices = table_catalog
381            .value_indices
382            .iter()
383            .map(|val| *val as usize)
384            .collect_vec();
385
386        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
387
388        // if value_indices is the no shuffle full columns.
389        let value_indices = match input_value_indices.len() == table_columns.len()
390            && input_value_indices == no_shuffle_value_indices
391        {
392            true => None,
393            false => Some(input_value_indices),
394        };
395        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
396
397        let make_row_serde = || {
398            SD::new(
399                Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
400                Arc::from(table_columns.clone().into_boxed_slice()),
401            )
402        };
403
404        let state_table_op_consistency_level = op_consistency_level;
405        let op_consistency_level = match op_consistency_level {
406            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
407            StateTableOpConsistencyLevel::ConsistentOldValue => {
408                let row_serde = make_row_serde();
409                consistent_old_value_op(row_serde, false)
410            }
411            StateTableOpConsistencyLevel::LogStoreEnabled => {
412                let row_serde = make_row_serde();
413                consistent_old_value_op(row_serde, true)
414            }
415        };
416
417        let table_option = TableOption::new(table_catalog.retention_seconds);
418        let new_local_options = if IS_REPLICATED {
419            NewLocalOptions::new_replicated(
420                table_id,
421                op_consistency_level,
422                table_option,
423                distribution.vnodes().clone(),
424            )
425        } else {
426            NewLocalOptions::new(
427                table_id,
428                op_consistency_level,
429                table_option,
430                distribution.vnodes().clone(),
431            )
432        };
433        let local_state_store = store.new_local(new_local_options).await;
434
435        let row_serde = make_row_serde();
436
437        // If state table has versioning, that means it supports
438        // Schema change. In that case, the row encoding should be column aware as well.
439        // Otherwise both will be false.
440        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
441        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
442        assert_eq!(
443            table_catalog.version.is_some(),
444            row_serde.kind().is_column_aware()
445        );
446
447        // Restore persisted table watermark.
448        let watermark_serde = if pk_indices.is_empty() {
449            None
450        } else {
451            match table_catalog.clean_watermark_index_in_pk {
452                None => Some(pk_serde.index(0)),
453                Some(clean_watermark_index_in_pk) => {
454                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
455                }
456            }
457        };
458        let max_watermark_of_vnodes = distribution
459            .vnodes()
460            .iter_vnodes()
461            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
462            .max();
463        let committed_watermark = if let Some(deser) = watermark_serde
464            && let Some(max_watermark) = max_watermark_of_vnodes
465        {
466            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
467                assert!(row.len() == 1);
468                row[0].clone()
469            });
470            if deserialized.is_none() {
471                tracing::error!(
472                    vnodes = ?distribution.vnodes(),
473                    watermark = ?max_watermark,
474                    "Failed to deserialize persisted watermark from state store.",
475                );
476            }
477            deserialized
478        } else {
479            None
480        };
481
482        let watermark_cache = if USE_WATERMARK_CACHE {
483            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
484        } else {
485            StateTableWatermarkCache::new(0)
486        };
487
488        // Get info for replicated state table.
489        let output_column_ids_to_input_idx = output_column_ids
490            .iter()
491            .enumerate()
492            .map(|(pos, id)| (*id, pos))
493            .collect::<HashMap<_, _>>();
494
495        // Compute column descriptions
496        let columns: Vec<ColumnDesc> = table_catalog
497            .columns
498            .iter()
499            .map(|c| c.column_desc.as_ref().unwrap().into())
500            .collect_vec();
501
502        // Compute i2o mapping
503        // Note that this can be a partial mapping, since we use the i2o mapping to get
504        // any 1 of the output columns, and use that to fill the input column.
505        let mut i2o_mapping = vec![None; columns.len()];
506        for (i, column) in columns.iter().enumerate() {
507            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
508                i2o_mapping[i] = Some(*pos);
509            }
510        }
511        // We can prune any duplicate column indices
512        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
513
514        // Compute output indices
515        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
516
517        Self {
518            table_id,
519            local_store: local_state_store,
520            store,
521            pk_serde,
522            row_serde,
523            pk_indices,
524            distribution,
525            prefix_hint_len,
526            table_option,
527            value_indices,
528            pending_watermark: None,
529            committed_watermark,
530            watermark_cache,
531            data_types,
532            output_indices,
533            i2o_mapping,
534            op_consistency_level: state_table_op_consistency_level,
535            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536            on_post_commit: false,
537        }
538    }
539
540    pub fn get_data_types(&self) -> &[DataType] {
541        &self.data_types
542    }
543
544    pub fn table_id(&self) -> u32 {
545        self.table_id.table_id
546    }
547
548    /// get the newest epoch of the state store and panic if the `init_epoch()` has never be called
549    pub fn epoch(&self) -> u64 {
550        self.local_store.epoch()
551    }
552
553    /// Get the vnode value with given (prefix of) primary key
554    fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
555        self.distribution
556            .try_compute_vnode_by_pk_prefix(pk_prefix)
557            .expect("For streaming, the given prefix must be enough to calculate the vnode")
558    }
559
560    /// Get the vnode value of the given primary key
561    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
562        self.distribution.compute_vnode_by_pk(pk)
563    }
564
565    /// NOTE(kwannoel): This is used by backfill.
566    /// We want to check pk indices of upstream table.
567    pub fn pk_indices(&self) -> &[usize] {
568        &self.pk_indices
569    }
570
571    /// Get the indices of the primary key columns in the output columns.
572    ///
573    /// Returns `None` if any of the primary key columns is not in the output columns.
574    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
575        assert!(IS_REPLICATED);
576        self.pk_indices
577            .iter()
578            .map(|&i| self.output_indices.iter().position(|&j| i == j))
579            .collect()
580    }
581
582    pub fn pk_serde(&self) -> &OrderedRowSerde {
583        &self.pk_serde
584    }
585
586    pub fn vnodes(&self) -> &Arc<Bitmap> {
587        self.distribution.vnodes()
588    }
589
590    pub fn value_indices(&self) -> &Option<Vec<usize>> {
591        &self.value_indices
592    }
593
594    fn is_dirty(&self) -> bool {
595        self.local_store.is_dirty() || self.pending_watermark.is_some()
596    }
597
598    pub fn is_consistent_op(&self) -> bool {
599        matches!(
600            self.op_consistency_level,
601            StateTableOpConsistencyLevel::ConsistentOldValue
602                | StateTableOpConsistencyLevel::LogStoreEnabled
603        )
604    }
605}
606
607impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
608where
609    S: StateStore,
610    SD: ValueRowSerde,
611{
612    /// Create replicated state table from table catalog with output indices
613    pub async fn from_table_catalog_with_output_column_ids(
614        table_catalog: &Table,
615        store: S,
616        vnodes: Option<Arc<Bitmap>>,
617        output_column_ids: Vec<ColumnId>,
618    ) -> Self {
619        Self::from_table_catalog_inner(
620            table_catalog,
621            store,
622            vnodes,
623            StateTableOpConsistencyLevel::Inconsistent,
624            output_column_ids,
625        )
626        .await
627    }
628}
629
630// point get
631impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
632    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
633where
634    S: StateStore,
635    SD: ValueRowSerde,
636{
637    /// Get a single row from state table.
638    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
639        let encoded_row: Option<Bytes> = self.get_encoded_row(pk).await?;
640        match encoded_row {
641            Some(encoded_row) => {
642                let row = self.row_serde.deserialize(&encoded_row)?;
643                if IS_REPLICATED {
644                    // If the table is replicated, we need to deserialize the row with the output
645                    // indices.
646                    let row = row.project(&self.output_indices);
647                    Ok(Some(row.into_owned_row()))
648                } else {
649                    Ok(Some(OwnedRow::new(row)))
650                }
651            }
652            None => Ok(None),
653        }
654    }
655
656    /// Get a raw encoded row from state table.
657    pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
658        assert!(pk.len() <= self.pk_indices.len());
659
660        let serialized_pk =
661            serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
662
663        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
664            Some(serialized_pk.slice(VirtualNode::SIZE..))
665        } else {
666            #[cfg(debug_assertions)]
667            if self.prefix_hint_len != 0 {
668                warn!(
669                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
670                );
671            }
672            None
673        };
674
675        let read_options = ReadOptions {
676            prefix_hint,
677            retention_seconds: self.table_option.retention_seconds,
678            table_id: self.table_id,
679            cache_policy: CachePolicy::Fill(CacheHint::Normal),
680            ..Default::default()
681        };
682
683        self.local_store
684            .get(serialized_pk, read_options)
685            .await
686            .map_err(Into::into)
687    }
688}
689
690/// A callback struct returned from [`StateTableInner::commit`].
691///
692/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
693/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
694/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
695///
696/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
697/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
698/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
699/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
700/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
701/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
702/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
703/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
704#[must_use]
705pub struct StateTablePostCommit<
706    'a,
707    S,
708    SD = BasicSerde,
709    const IS_REPLICATED: bool = false,
710    const USE_WATERMARK_CACHE: bool = false,
711> where
712    S: StateStore,
713    SD: ValueRowSerde,
714{
715    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
716}
717
718impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
719    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
720where
721    S: StateStore,
722    SD: ValueRowSerde,
723{
724    pub async fn post_yield_barrier(
725        mut self,
726        new_vnodes: Option<Arc<Bitmap>>,
727    ) -> StreamExecutorResult<
728        Option<(
729            (
730                Arc<Bitmap>,
731                Arc<Bitmap>,
732                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
733            ),
734            bool,
735        )>,
736    > {
737        self.inner.on_post_commit = false;
738        Ok(if let Some(new_vnodes) = new_vnodes {
739            let (old_vnodes, cache_may_stale) =
740                self.update_vnode_bitmap(new_vnodes.clone()).await?;
741            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
742        } else {
743            None
744        })
745    }
746
747    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
748        &*self.inner
749    }
750
751    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
752    async fn update_vnode_bitmap(
753        &mut self,
754        new_vnodes: Arc<Bitmap>,
755    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
756        assert!(
757            !self.inner.is_dirty(),
758            "vnode bitmap should only be updated when state table is clean"
759        );
760        let prev_vnodes = self
761            .inner
762            .local_store
763            .update_vnode_bitmap(new_vnodes.clone())
764            .await?;
765        assert_eq!(
766            &prev_vnodes,
767            self.inner.vnodes(),
768            "state table and state store vnode bitmap mismatches"
769        );
770
771        if self.inner.distribution.is_singleton() {
772            assert_eq!(
773                &new_vnodes,
774                self.inner.vnodes(),
775                "should not update vnode bitmap for singleton table"
776            );
777        }
778        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
779
780        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
781
782        if cache_may_stale {
783            self.inner.pending_watermark = None;
784            if USE_WATERMARK_CACHE {
785                self.inner.watermark_cache.clear();
786            }
787        }
788
789        Ok((
790            self.inner.distribution.update_vnode_bitmap(new_vnodes),
791            cache_may_stale,
792        ))
793    }
794}
795
796// write
797impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
798    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
799where
800    S: StateStore,
801    SD: ValueRowSerde,
802{
803    fn handle_mem_table_error(&self, e: StorageError) {
804        let e = match e.into_inner() {
805            ErrorKind::MemTable(e) => e,
806            _ => unreachable!("should only get memtable error"),
807        };
808        match *e {
809            MemTableError::InconsistentOperation { key, prev, new } => {
810                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
811                panic!(
812                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
813                    self.table_id(),
814                    vnode,
815                    &key,
816                    prev.debug_fmt(&self.row_serde),
817                    new.debug_fmt(&self.row_serde),
818                )
819            }
820        }
821    }
822
823    fn serialize_value(&self, value: impl Row) -> Bytes {
824        if let Some(value_indices) = self.value_indices.as_ref() {
825            self.row_serde
826                .serialize(value.project(value_indices))
827                .into()
828        } else {
829            self.row_serde.serialize(value).into()
830        }
831    }
832
833    fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
834        insane_mode_discard_point!();
835        self.local_store
836            .insert(key, value_bytes, None)
837            .unwrap_or_else(|e| self.handle_mem_table_error(e));
838    }
839
840    fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
841        insane_mode_discard_point!();
842        self.local_store
843            .delete(key, value_bytes)
844            .unwrap_or_else(|e| self.handle_mem_table_error(e));
845    }
846
847    fn update_inner(
848        &mut self,
849        key_bytes: TableKey<Bytes>,
850        old_value_bytes: Option<Bytes>,
851        new_value_bytes: Bytes,
852    ) {
853        insane_mode_discard_point!();
854        self.local_store
855            .insert(key_bytes, new_value_bytes, old_value_bytes)
856            .unwrap_or_else(|e| self.handle_mem_table_error(e));
857    }
858
859    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
860    /// the table.
861    pub fn insert(&mut self, value: impl Row) {
862        let pk_indices = &self.pk_indices;
863        let pk = (&value).project(pk_indices);
864        if USE_WATERMARK_CACHE {
865            self.watermark_cache.insert(&pk);
866        }
867
868        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
869        let value_bytes = self.serialize_value(value);
870        self.insert_inner(key_bytes, value_bytes);
871    }
872
873    /// Delete a row from state table. Must provide a full row of old value corresponding to the
874    /// column desc of the table.
875    pub fn delete(&mut self, old_value: impl Row) {
876        let pk_indices = &self.pk_indices;
877        let pk = (&old_value).project(pk_indices);
878        if USE_WATERMARK_CACHE {
879            self.watermark_cache.delete(&pk);
880        }
881
882        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
883        let value_bytes = self.serialize_value(old_value);
884        self.delete_inner(key_bytes, value_bytes);
885    }
886
887    /// Update a row. The old and new value should have the same pk.
888    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
889        let old_pk = (&old_value).project(self.pk_indices());
890        let new_pk = (&new_value).project(self.pk_indices());
891        debug_assert!(
892            Row::eq(&old_pk, new_pk),
893            "pk should not change: {old_pk:?} vs {new_pk:?}",
894        );
895
896        let new_key_bytes =
897            serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
898        let old_value_bytes = self.serialize_value(old_value);
899        let new_value_bytes = self.serialize_value(new_value);
900
901        self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
902    }
903
904    /// Write a record into state table. Must have the same schema with the table.
905    pub fn write_record(&mut self, record: Record<impl Row>) {
906        match record {
907            Record::Insert { new_row } => self.insert(new_row),
908            Record::Delete { old_row } => self.delete(old_row),
909            Record::Update { old_row, new_row } => self.update(old_row, new_row),
910        }
911    }
912
913    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
914        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
915    }
916
917    /// Write batch with a `StreamChunk` which should have the same schema with the table.
918    // allow(izip, which use zip instead of zip_eq)
919    #[allow(clippy::disallowed_methods)]
920    pub fn write_chunk(&mut self, chunk: StreamChunk) {
921        let chunk = if IS_REPLICATED {
922            self.fill_non_output_indices(chunk)
923        } else {
924            chunk
925        };
926        let (chunk, op) = chunk.into_parts();
927
928        let vnodes = self
929            .distribution
930            .compute_chunk_vnode(&chunk, &self.pk_indices);
931
932        let values = if let Some(ref value_indices) = self.value_indices {
933            chunk.project(value_indices).serialize_with(&self.row_serde)
934        } else {
935            chunk.serialize_with(&self.row_serde)
936        };
937
938        // TODO(kwannoel): Seems like we are doing vis check twice here.
939        // Once below, when using vis, and once here,
940        // when using vis to set rows empty or not.
941        // If we are to use the vis optimization, we should skip this.
942        let key_chunk = chunk.project(self.pk_indices());
943        let vnode_and_pks = key_chunk
944            .rows_with_holes()
945            .zip_eq_fast(vnodes.iter())
946            .map(|(r, vnode)| {
947                let mut buffer = BytesMut::new();
948                buffer.put_slice(&vnode.to_be_bytes()[..]);
949                if let Some(r) = r {
950                    self.pk_serde.serialize(r, &mut buffer);
951                }
952                (r, buffer.freeze())
953            })
954            .collect_vec();
955
956        if !key_chunk.is_compacted() {
957            for ((op, (key, key_bytes), value), vis) in
958                izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
959            {
960                if vis {
961                    match op {
962                        Op::Insert | Op::UpdateInsert => {
963                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
964                                self.watermark_cache.insert(pk);
965                            }
966                            self.insert_inner(TableKey(key_bytes), value);
967                        }
968                        Op::Delete | Op::UpdateDelete => {
969                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
970                                self.watermark_cache.delete(pk);
971                            }
972                            self.delete_inner(TableKey(key_bytes), value);
973                        }
974                    }
975                }
976            }
977        } else {
978            for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
979                match op {
980                    Op::Insert | Op::UpdateInsert => {
981                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
982                            self.watermark_cache.insert(pk);
983                        }
984                        self.insert_inner(TableKey(key_bytes), value);
985                    }
986                    Op::Delete | Op::UpdateDelete => {
987                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
988                            self.watermark_cache.delete(pk);
989                        }
990                        self.delete_inner(TableKey(key_bytes), value);
991                    }
992                }
993            }
994        }
995    }
996
997    /// Update watermark for state cleaning.
998    ///
999    /// # Arguments
1000    ///
1001    /// * `watermark` - Latest watermark received.
1002    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1003        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1004        self.pending_watermark = Some(watermark);
1005    }
1006
1007    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1008    /// table through `update_watermark` method.
1009    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1010        self.committed_watermark.as_ref()
1011    }
1012
1013    pub async fn commit(
1014        &mut self,
1015        new_epoch: EpochPair,
1016    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1017    {
1018        self.commit_inner(new_epoch, None).await
1019    }
1020
1021    #[cfg(test)]
1022    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1023        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1024    }
1025
1026    pub async fn commit_assert_no_update_vnode_bitmap(
1027        &mut self,
1028        new_epoch: EpochPair,
1029    ) -> StreamExecutorResult<()> {
1030        let post_commit = self.commit_inner(new_epoch, None).await?;
1031        post_commit.post_yield_barrier(None).await?;
1032        Ok(())
1033    }
1034
1035    pub async fn commit_may_switch_consistent_op(
1036        &mut self,
1037        new_epoch: EpochPair,
1038        op_consistency_level: StateTableOpConsistencyLevel,
1039    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1040    {
1041        if self.op_consistency_level != op_consistency_level {
1042            info!(
1043                ?new_epoch,
1044                prev_op_consistency_level = ?self.op_consistency_level,
1045                ?op_consistency_level,
1046                table_id = self.table_id.table_id,
1047                "switch to new op consistency level"
1048            );
1049            self.commit_inner(new_epoch, Some(op_consistency_level))
1050                .await
1051        } else {
1052            self.commit_inner(new_epoch, None).await
1053        }
1054    }
1055
1056    async fn commit_inner(
1057        &mut self,
1058        new_epoch: EpochPair,
1059        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1060    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1061    {
1062        assert!(!self.on_post_commit);
1063        assert_eq!(self.epoch(), new_epoch.prev);
1064        let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1065            assert_ne!(self.op_consistency_level, new_consistency_level);
1066            self.op_consistency_level = new_consistency_level;
1067            match new_consistency_level {
1068                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1069                StateTableOpConsistencyLevel::ConsistentOldValue => {
1070                    consistent_old_value_op(self.row_serde.clone(), false)
1071                }
1072                StateTableOpConsistencyLevel::LogStoreEnabled => {
1073                    consistent_old_value_op(self.row_serde.clone(), true)
1074                }
1075            }
1076        });
1077        trace!(
1078            table_id = %self.table_id,
1079            epoch = ?self.epoch(),
1080            "commit state table"
1081        );
1082
1083        let mut table_watermarks = None;
1084        if self.is_dirty() {
1085            self.local_store
1086                .flush()
1087                .instrument(tracing::info_span!("state_table_flush"))
1088                .await?;
1089            table_watermarks = self.commit_pending_watermark();
1090        }
1091        self.local_store.seal_current_epoch(
1092            new_epoch.curr,
1093            SealCurrentEpochOptions {
1094                table_watermarks,
1095                switch_op_consistency_level,
1096            },
1097        );
1098
1099        // Refresh watermark cache if it is out of sync.
1100        if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
1101            if let Some(ref watermark) = self.committed_watermark {
1102                let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1103                    (Included(once(Some(watermark.clone()))), Unbounded);
1104                // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1105                // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1106                // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1107                // TODO(kwannoel): We can optimize it with:
1108                // 1. Either use `RefCell`.
1109                // 2. Or pass in a direct reference to LocalStateStore,
1110                //    instead of referencing it indirectly from `self`.
1111                //    Similar to how we do for pk_indices.
1112                let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1113                {
1114                    let mut streams = vec![];
1115                    for vnode in self.vnodes().iter_vnodes() {
1116                        let stream = self
1117                            .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1118                            .await?;
1119                        streams.push(Box::pin(stream));
1120                    }
1121                    let merged_stream = merge_sort(streams);
1122                    pin_mut!(merged_stream);
1123
1124                    #[for_await]
1125                    for entry in merged_stream.take(self.watermark_cache.capacity()) {
1126                        let keyed_row = entry?;
1127                        let pk = self.pk_serde.deserialize(keyed_row.key())?;
1128                        // watermark column should be part of the pk
1129                        if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1130                            pks.push(pk);
1131                        }
1132                    }
1133                }
1134
1135                let mut filler = self.watermark_cache.begin_syncing();
1136                for pk in pks {
1137                    filler.insert_unchecked(DefaultOrdered(pk), ());
1138                }
1139                filler.finish();
1140
1141                let n_cache_entries = self.watermark_cache.len();
1142                if n_cache_entries < self.watermark_cache.capacity() {
1143                    self.watermark_cache.set_table_row_count(n_cache_entries);
1144                }
1145            }
1146        }
1147
1148        self.on_post_commit = true;
1149        Ok(StateTablePostCommit { inner: self })
1150    }
1151
1152    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1153    fn commit_pending_watermark(
1154        &mut self,
1155    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1156        let watermark = self.pending_watermark.take();
1157        watermark.as_ref().inspect(|watermark| {
1158            trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1159        });
1160
1161        let watermark_serializer = if self.pk_indices().is_empty() {
1162            None
1163        } else {
1164            match self.clean_watermark_index_in_pk {
1165                None => Some(self.pk_serde.index(0)),
1166                Some(clean_watermark_index_in_pk) => {
1167                    Some(self.pk_serde.index(clean_watermark_index_in_pk as usize))
1168                }
1169            }
1170        };
1171
1172        let watermark_type = match self.clean_watermark_index_in_pk {
1173            None => WatermarkSerdeType::PkPrefix,
1174            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1175                0 => WatermarkSerdeType::PkPrefix,
1176                _ => WatermarkSerdeType::NonPkPrefix,
1177            },
1178        };
1179
1180        let should_clean_watermark = match watermark {
1181            Some(ref watermark) => {
1182                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1183                    if let Some(key) = self.watermark_cache.lowest_key() {
1184                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1185                    } else {
1186                        // Watermark cache is synced,
1187                        // And there's no key in watermark cache.
1188                        // That implies table is empty.
1189                        // We should not clean watermark.
1190                        false
1191                    }
1192                } else {
1193                    // Either we are not using watermark cache,
1194                    // Or watermark_cache is not synced.
1195                    // In either case we should clean watermark.
1196                    true
1197                }
1198            }
1199            None => false,
1200        };
1201
1202        let watermark_suffix = watermark.as_ref().map(|watermark| {
1203            serialize_pk(
1204                row::once(Some(watermark.clone())),
1205                watermark_serializer.as_ref().unwrap(),
1206            )
1207        });
1208
1209        let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark, WatermarkSerdeType)> =
1210            None;
1211
1212        // Compute Delete Ranges
1213        if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix {
1214            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1215                self.vnodes().iter_vnodes().collect_vec()
1216            }, "delete range");
1217
1218            let order_type = watermark_serializer
1219                .as_ref()
1220                .unwrap()
1221                .get_order_types()
1222                .get(0)
1223                .unwrap();
1224
1225            if order_type.is_ascending() {
1226                seal_watermark = Some((
1227                    WatermarkDirection::Ascending,
1228                    VnodeWatermark::new(
1229                        self.vnodes().clone(),
1230                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1231                    ),
1232                    watermark_type,
1233                ));
1234            } else {
1235                seal_watermark = Some((
1236                    WatermarkDirection::Descending,
1237                    VnodeWatermark::new(
1238                        self.vnodes().clone(),
1239                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1240                    ),
1241                    watermark_type,
1242                ));
1243            }
1244        }
1245        self.committed_watermark = watermark;
1246
1247        // Clear the watermark cache and force a resync.
1248        // TODO(kwannoel): This can be further optimized:
1249        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1250        //    up to the largest end of delete ranges.
1251        // 2. Mark the cache as not_synced, so we can still refill it later.
1252        // 3. When refilling the cache,
1253        //    we just refill from the largest value of the cache, as the lower bound.
1254        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1255            self.watermark_cache.clear();
1256        }
1257
1258        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1259            (direction, vec![watermark], is_non_pk_prefix)
1260        })
1261    }
1262
1263    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1264        self.local_store.try_flush().await?;
1265        Ok(())
1266    }
1267}
1268
1269pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1270pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1271pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1272
1273// Iterator functions
1274impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1275    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1276where
1277    S: StateStore,
1278    SD: ValueRowSerde,
1279{
1280    /// This function scans rows from the relational table with specific `pk_range` under the same
1281    /// `vnode`.
1282    pub async fn iter_with_vnode(
1283        &self,
1284
1285        // Optional vnode that returns an iterator only over the given range under that vnode.
1286        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1287        // iterate over each vnode that the `StateTableInner` owns.
1288        vnode: VirtualNode,
1289        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1290        prefetch_options: PrefetchOptions,
1291    ) -> StreamExecutorResult<impl RowStream<'_>> {
1292        Ok(deserialize_keyed_row_stream::<'_, ()>(
1293            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1294                .await?,
1295            &self.row_serde,
1296        )
1297        .map_ok(|(_, row)| row))
1298    }
1299
1300    pub async fn iter_keyed_row_with_vnode(
1301        &self,
1302        vnode: VirtualNode,
1303        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1304        prefetch_options: PrefetchOptions,
1305    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1306        Ok(deserialize_keyed_row_stream(
1307            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1308                .await?,
1309            &self.row_serde,
1310        )
1311        .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1312    }
1313
1314    pub async fn iter_with_vnode_and_output_indices(
1315        &self,
1316        vnode: VirtualNode,
1317        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1318        prefetch_options: PrefetchOptions,
1319    ) -> StreamExecutorResult<impl RowStream<'_>> {
1320        assert!(IS_REPLICATED);
1321        let stream = self
1322            .iter_with_vnode(vnode, pk_range, prefetch_options)
1323            .await?;
1324        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1325    }
1326
1327    async fn iter_kv(
1328        &self,
1329        table_key_range: TableKeyRange,
1330        prefix_hint: Option<Bytes>,
1331        prefetch_options: PrefetchOptions,
1332    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1333        let read_options = ReadOptions {
1334            prefix_hint,
1335            retention_seconds: self.table_option.retention_seconds,
1336            table_id: self.table_id,
1337            prefetch_options,
1338            cache_policy: CachePolicy::Fill(CacheHint::Normal),
1339            ..Default::default()
1340        };
1341
1342        Ok(self.local_store.iter(table_key_range, read_options).await?)
1343    }
1344
1345    async fn rev_iter_kv(
1346        &self,
1347        table_key_range: TableKeyRange,
1348        prefix_hint: Option<Bytes>,
1349        prefetch_options: PrefetchOptions,
1350    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1351        let read_options = ReadOptions {
1352            prefix_hint,
1353            retention_seconds: self.table_option.retention_seconds,
1354            table_id: self.table_id,
1355            prefetch_options,
1356            cache_policy: CachePolicy::Fill(CacheHint::Normal),
1357            ..Default::default()
1358        };
1359
1360        Ok(self
1361            .local_store
1362            .rev_iter(table_key_range, read_options)
1363            .await?)
1364    }
1365
1366    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1367    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1368    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1369    pub async fn iter_with_prefix(
1370        &self,
1371        pk_prefix: impl Row,
1372        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1373        prefetch_options: PrefetchOptions,
1374    ) -> StreamExecutorResult<impl RowStream<'_>> {
1375        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1376            .await?;
1377        Ok(stream.map_ok(|(_, row)| row))
1378    }
1379
1380    /// Get the row from a state table with only 1 row.
1381    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1382        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1383        let stream = self
1384            .iter_with_prefix(row::empty(), sub_range, Default::default())
1385            .await?;
1386        pin_mut!(stream);
1387
1388        if let Some(res) = stream.next().await {
1389            let value = res?.into_owned_row();
1390            assert!(stream.next().await.is_none());
1391            Ok(Some(value))
1392        } else {
1393            Ok(None)
1394        }
1395    }
1396
1397    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1398    ///
1399    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1400    /// which does not matter in the use case.
1401    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1402        Ok(self
1403            .get_from_one_row_table()
1404            .await?
1405            .and_then(|row| row[0].clone()))
1406    }
1407
1408    pub async fn iter_keyed_row_with_prefix(
1409        &self,
1410        pk_prefix: impl Row,
1411        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1412        prefetch_options: PrefetchOptions,
1413    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1414        Ok(
1415            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1416            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1417        )
1418    }
1419
1420    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1421    pub async fn rev_iter_with_prefix(
1422        &self,
1423        pk_prefix: impl Row,
1424        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1425        prefetch_options: PrefetchOptions,
1426    ) -> StreamExecutorResult<impl RowStream<'_>> {
1427        Ok(
1428            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1429            .await?.map_ok(|(_, row)| row),
1430        )
1431    }
1432
1433    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1434        &self,
1435        pk_prefix: impl Row,
1436        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1437        prefetch_options: PrefetchOptions,
1438    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1439        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1440        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1441
1442        // We assume that all usages of iterating the state table only access a single vnode.
1443        // If this assertion fails, then something must be wrong with the operator implementation or
1444        // the distribution derivation from the optimizer.
1445        let vnode = self.compute_prefix_vnode(&pk_prefix);
1446
1447        // Construct prefix hint for prefix bloom filter.
1448        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1449        if self.prefix_hint_len != 0 {
1450            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1451        }
1452        let prefix_hint = {
1453            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1454                None
1455            } else {
1456                let encoded_prefix_len = self
1457                    .pk_serde
1458                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1459
1460                Some(Bytes::copy_from_slice(
1461                    &encoded_prefix[..encoded_prefix_len],
1462                ))
1463            }
1464        };
1465
1466        trace!(
1467            table_id = %self.table_id(),
1468            ?prefix_hint, ?pk_prefix,
1469            ?pk_prefix_indices,
1470            iter_direction = if REVERSE { "reverse" } else { "forward" },
1471            "storage_iter_with_prefix"
1472        );
1473
1474        let memcomparable_range =
1475            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1476
1477        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1478
1479        Ok(if REVERSE {
1480            futures::future::Either::Left(deserialize_keyed_row_stream(
1481                self.rev_iter_kv(
1482                    memcomparable_range_with_vnode,
1483                    prefix_hint,
1484                    prefetch_options,
1485                )
1486                .await?,
1487                &self.row_serde,
1488            ))
1489        } else {
1490            futures::future::Either::Right(deserialize_keyed_row_stream(
1491                self.iter_kv(
1492                    memcomparable_range_with_vnode,
1493                    prefix_hint,
1494                    prefetch_options,
1495                )
1496                .await?,
1497                &self.row_serde,
1498            ))
1499        })
1500    }
1501
1502    /// This function scans raw key-values from the relational table with specific `pk_range` under
1503    /// the same `vnode`.
1504    async fn iter_kv_with_pk_range(
1505        &self,
1506        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1507        // Optional vnode that returns an iterator only over the given range under that vnode.
1508        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1509        // iterate over each vnode that the `StateTableInner` owns.
1510        vnode: VirtualNode,
1511        prefetch_options: PrefetchOptions,
1512    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1513        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1514        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1515
1516        // TODO: provide a trace of useful params.
1517        self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1518            .await
1519    }
1520
1521    #[cfg(test)]
1522    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1523        &self.watermark_cache
1524    }
1525}
1526
1527impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1528    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1529where
1530    S: StateStore,
1531    SD: ValueRowSerde,
1532{
1533    pub async fn iter_log_with_vnode(
1534        &self,
1535        vnode: VirtualNode,
1536        epoch_range: (u64, u64),
1537        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1538    ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1539        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1540        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1541        Ok(deserialize_log_stream(
1542            self.store
1543                .iter_log(
1544                    epoch_range,
1545                    memcomparable_range_with_vnode,
1546                    ReadLogOptions {
1547                        table_id: self.table_id,
1548                    },
1549                )
1550                .await?,
1551            &self.row_serde,
1552        )
1553        .map_err(Into::into))
1554    }
1555}
1556
1557fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1558    iter: impl StateStoreIter + 'a,
1559    deserializer: &'a impl ValueRowSerde,
1560) -> impl PkRowStream<'a, K> {
1561    iter.into_stream(move |(key, value)| {
1562        Ok((
1563            K::copy_from_slice(key.user_key.table_key.as_ref()),
1564            deserializer.deserialize(value).map(OwnedRow::new)?,
1565        ))
1566    })
1567    .map_err(Into::into)
1568}
1569
1570pub fn prefix_range_to_memcomparable(
1571    pk_serde: &OrderedRowSerde,
1572    range: &(Bound<impl Row>, Bound<impl Row>),
1573) -> (Bound<Bytes>, Bound<Bytes>) {
1574    (
1575        start_range_to_memcomparable(pk_serde, &range.0),
1576        end_range_to_memcomparable(pk_serde, &range.1, None),
1577    )
1578}
1579
1580fn prefix_and_sub_range_to_memcomparable(
1581    pk_serde: &OrderedRowSerde,
1582    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1583    pk_prefix: impl Row,
1584) -> (Bound<Bytes>, Bound<Bytes>) {
1585    let (range_start, range_end) = sub_range;
1586    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1587    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1588    let start_range = match range_start {
1589        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1590        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1591        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1592    };
1593    let end_range = match range_end {
1594        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1595        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1596        Unbounded => Unbounded,
1597    };
1598    (
1599        start_range_to_memcomparable(pk_serde, &start_range),
1600        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1601    )
1602}
1603
1604fn start_range_to_memcomparable<R: Row>(
1605    pk_serde: &OrderedRowSerde,
1606    bound: &Bound<R>,
1607) -> Bound<Bytes> {
1608    let serialize_pk_prefix = |pk_prefix: &R| {
1609        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1610        serialize_pk(pk_prefix, &prefix_serializer)
1611    };
1612    match bound {
1613        Unbounded => Unbounded,
1614        Included(r) => {
1615            let serialized = serialize_pk_prefix(r);
1616
1617            Included(serialized)
1618        }
1619        Excluded(r) => {
1620            let serialized = serialize_pk_prefix(r);
1621
1622            start_bound_of_excluded_prefix(&serialized)
1623        }
1624    }
1625}
1626
1627fn end_range_to_memcomparable<R: Row>(
1628    pk_serde: &OrderedRowSerde,
1629    bound: &Bound<R>,
1630    serialized_pk_prefix: Option<Bytes>,
1631) -> Bound<Bytes> {
1632    let serialize_pk_prefix = |pk_prefix: &R| {
1633        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1634        serialize_pk(pk_prefix, &prefix_serializer)
1635    };
1636    match bound {
1637        Unbounded => match serialized_pk_prefix {
1638            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1639            None => Unbounded,
1640        },
1641        Included(r) => {
1642            let serialized = serialize_pk_prefix(r);
1643
1644            end_bound_of_prefix(&serialized)
1645        }
1646        Excluded(r) => {
1647            let serialized = serialize_pk_prefix(r);
1648            Excluded(serialized)
1649        }
1650    }
1651}
1652
1653fn fill_non_output_indices(
1654    i2o_mapping: &ColIndexMapping,
1655    data_types: &[DataType],
1656    chunk: StreamChunk,
1657) -> StreamChunk {
1658    let cardinality = chunk.cardinality();
1659    let (ops, columns, vis) = chunk.into_inner();
1660    let mut full_columns = Vec::with_capacity(data_types.len());
1661    for (i, data_type) in data_types.iter().enumerate() {
1662        if let Some(j) = i2o_mapping.try_map(i) {
1663            full_columns.push(columns[j].clone());
1664        } else {
1665            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1666            column_builder.append_n_null(cardinality);
1667            let column: ArrayRef = column_builder.finish().into();
1668            full_columns.push(column)
1669        }
1670    }
1671    let data_chunk = DataChunk::new(full_columns, vis);
1672    StreamChunk::from_parts(ops, data_chunk)
1673}
1674
1675#[cfg(test)]
1676mod tests {
1677    use std::fmt::Debug;
1678
1679    use expect_test::{Expect, expect};
1680
1681    use super::*;
1682
1683    fn check(actual: impl Debug, expect: Expect) {
1684        let actual = format!("{:#?}", actual);
1685        expect.assert_eq(&actual);
1686    }
1687
1688    #[test]
1689    fn test_fill_non_output_indices() {
1690        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1691        let replicated_chunk = [OwnedRow::new(vec![
1692            Some(222_i32.into()),
1693            Some(2_i32.into()),
1694        ])];
1695        let replicated_chunk = StreamChunk::from_parts(
1696            vec![Op::Insert],
1697            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1698        );
1699        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1700        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1701        check(
1702            filled_chunk,
1703            expect![[r#"
1704            StreamChunk { cardinality: 1, capacity: 1, data:
1705            +---+---+---+-----+
1706            | + | 2 |   | 222 |
1707            +---+---+---+-----+
1708             }"#]],
1709        );
1710    }
1711}