risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43    CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44    start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62    ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72/// Mostly watermark operators will have inserts (append-only).
73/// So this number should not need to be very large.
74/// But we may want to improve this choice in the future.
75const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77/// This macro is used to mark a point where we want to randomly discard the operation and early
78/// return, only in insane mode.
79macro_rules! insane_mode_discard_point {
80    () => {{
81        use rand::Rng;
82        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83            return;
84        }
85    }};
86}
87
88/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
89/// row-based encoding.
90#[derive(Clone)]
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    local_store: S::Local,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Row deserializer with value encoding
116    row_serde: Arc<SD>,
117
118    /// Indices of primary key.
119    /// Note that the index is based on the all columns of the table, instead of the output ones.
120    // FIXME: revisit constructions and usages.
121    pk_indices: Vec<usize>,
122
123    /// Distribution of the state table.
124    ///
125    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
126    /// executor. The table will also check whether the written rows
127    /// conform to this partition.
128    distribution: TableDistribution,
129
130    prefix_hint_len: usize,
131
132    /// Used for catalog `table_properties`
133    table_option: TableOption,
134
135    value_indices: Option<Vec<usize>>,
136
137    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
138    pending_watermark: Option<ScalarImpl>,
139    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
140    committed_watermark: Option<ScalarImpl>,
141    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
142    watermark_cache: StateTableWatermarkCache,
143
144    /// Data Types
145    /// We will need to use to build data chunks from state table rows.
146    data_types: Vec<DataType>,
147
148    /// "i" here refers to the base `state_table`'s actual schema.
149    /// "o" here refers to the replicated state table's output schema.
150    /// This mapping is used to reconstruct a row being written from replicated state table.
151    /// Such that the schema of this row will match the full schema of the base state table.
152    /// It is only applicable for replication.
153    i2o_mapping: ColIndexMapping,
154
155    /// Output indices
156    /// Used for:
157    /// 1. Computing `output_value_indices` to ser/de replicated rows.
158    /// 2. Computing output pk indices to used them for backfill state.
159    output_indices: Vec<usize>,
160
161    op_consistency_level: StateTableOpConsistencyLevel,
162
163    clean_watermark_index_in_pk: Option<i32>,
164
165    /// Flag to indicate whether the state table has called `commit`, but has not called
166    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
167    on_post_commit: bool,
168}
169
170/// `StateTable` will use `BasicSerde` as default
171pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
173/// Used for `ArrangementBackfill` executor.
174pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175/// `WatermarkCacheStateTable` caches the watermark column.
176/// It will reduce state cleaning overhead.
177pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181// initialize
182impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185    S: StateStore,
186    SD: ValueRowSerde,
187{
188    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
189    /// and otherwise, deadlock can be likely to happen.
190    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191        self.local_store.init(InitOptions::new(epoch)).await?;
192        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193        Ok(())
194    }
195
196    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197        self.store
198            .try_wait_epoch(
199                HummockReadEpoch::Committed(prev_epoch),
200                TryWaitEpochOptions {
201                    table_id: self.table_id,
202                },
203            )
204            .await
205    }
206
207    pub fn state_store(&self) -> &S {
208        &self.store
209    }
210}
211
212fn consistent_old_value_op(
213    row_serde: Arc<impl ValueRowSerde>,
214    is_log_store: bool,
215) -> OpConsistencyLevel {
216    OpConsistencyLevel::ConsistentOldValue {
217        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218            if first == second {
219                return true;
220            }
221            let first = match row_serde.deserialize(first) {
222                Ok(rows) => rows,
223                Err(e) => {
224                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225                    return false;
226                }
227            };
228            let second = match row_serde.deserialize(second) {
229                Ok(rows) => rows,
230                Err(e) => {
231                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232                    return false;
233                }
234            };
235            if first != second {
236                error!(first = ?first, second = ?second, "sanity check fail");
237                false
238            } else {
239                true
240            }
241        }),
242        is_log_store,
243    }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248    /// Op is inconsistent
249    Inconsistent,
250    /// Op is consistent.
251    /// - Insert op should ensure that the key does not exist previously
252    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
253    ConsistentOldValue,
254    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
255    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
256    LogStoreEnabled,
257}
258
259// initialize
260// FIXME(kwannoel): Enforce that none of the constructors here
261// should be used by replicated state table.
262// Apart from from_table_catalog_inner.
263impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266    S: StateStore,
267    SD: ValueRowSerde,
268{
269    /// Create state table from table catalog and store.
270    ///
271    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
272    pub async fn from_table_catalog(
273        table_catalog: &Table,
274        store: S,
275        vnodes: Option<Arc<Bitmap>>,
276    ) -> Self {
277        Self::from_table_catalog_with_consistency_level(
278            table_catalog,
279            store,
280            vnodes,
281            StateTableOpConsistencyLevel::ConsistentOldValue,
282        )
283        .await
284    }
285
286    /// Create state table from table catalog and store with sanity check disabled.
287    pub async fn from_table_catalog_inconsistent_op(
288        table_catalog: &Table,
289        store: S,
290        vnodes: Option<Arc<Bitmap>>,
291    ) -> Self {
292        Self::from_table_catalog_with_consistency_level(
293            table_catalog,
294            store,
295            vnodes,
296            StateTableOpConsistencyLevel::Inconsistent,
297        )
298        .await
299    }
300
301    pub async fn from_table_catalog_with_consistency_level(
302        table_catalog: &Table,
303        store: S,
304        vnodes: Option<Arc<Bitmap>>,
305        consistency_level: StateTableOpConsistencyLevel,
306    ) -> Self {
307        Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308            .await
309    }
310
311    /// Create state table from table catalog and store.
312    async fn from_table_catalog_inner(
313        table_catalog: &Table,
314        store: S,
315        vnodes: Option<Arc<Bitmap>>,
316        op_consistency_level: StateTableOpConsistencyLevel,
317        output_column_ids: Vec<ColumnId>,
318    ) -> Self {
319        let table_id = TableId::new(table_catalog.id);
320        let table_columns: Vec<ColumnDesc> = table_catalog
321            .columns
322            .iter()
323            .map(|col| col.column_desc.as_ref().unwrap().into())
324            .collect();
325        let data_types: Vec<DataType> = table_catalog
326            .columns
327            .iter()
328            .map(|col| {
329                col.get_column_desc()
330                    .unwrap()
331                    .get_column_type()
332                    .unwrap()
333                    .into()
334            })
335            .collect();
336        let order_types: Vec<OrderType> = table_catalog
337            .pk
338            .iter()
339            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340            .collect();
341        let dist_key_indices: Vec<usize> = table_catalog
342            .distribution_key
343            .iter()
344            .map(|dist_index| *dist_index as usize)
345            .collect();
346
347        let pk_indices = table_catalog
348            .pk
349            .iter()
350            .map(|col_order| col_order.column_index as usize)
351            .collect_vec();
352
353        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
354        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356        } else {
357            table_catalog
358                .get_dist_key_in_pk()
359                .iter()
360                .map(|idx| *idx as usize)
361                .collect()
362        };
363
364        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365            let vnode_col_idx = *idx as usize;
366            pk_indices.iter().position(|&i| vnode_col_idx == i)
367        });
368
369        let distribution =
370            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371        assert_eq!(
372            distribution.vnode_count(),
373            table_catalog.vnode_count(),
374            "vnode count mismatch, scanning table {} under wrong distribution?",
375            table_catalog.name,
376        );
377
378        let pk_data_types = pk_indices
379            .iter()
380            .map(|i| table_columns[*i].data_type.clone())
381            .collect();
382        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384        let input_value_indices = table_catalog
385            .value_indices
386            .iter()
387            .map(|val| *val as usize)
388            .collect_vec();
389
390        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392        // if value_indices is the no shuffle full columns.
393        let value_indices = match input_value_indices.len() == table_columns.len()
394            && input_value_indices == no_shuffle_value_indices
395        {
396            true => None,
397            false => Some(input_value_indices),
398        };
399        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401        let row_serde = Arc::new(SD::new(
402            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403            Arc::from(table_columns.clone().into_boxed_slice()),
404        ));
405
406        let state_table_op_consistency_level = op_consistency_level;
407        let op_consistency_level = match op_consistency_level {
408            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409            StateTableOpConsistencyLevel::ConsistentOldValue => {
410                consistent_old_value_op(row_serde.clone(), false)
411            }
412            StateTableOpConsistencyLevel::LogStoreEnabled => {
413                consistent_old_value_op(row_serde.clone(), true)
414            }
415        };
416
417        let table_option = TableOption::new(table_catalog.retention_seconds);
418        let new_local_options = if IS_REPLICATED {
419            NewLocalOptions::new_replicated(
420                table_id,
421                op_consistency_level,
422                table_option,
423                distribution.vnodes().clone(),
424            )
425        } else {
426            NewLocalOptions::new(
427                table_id,
428                op_consistency_level,
429                table_option,
430                distribution.vnodes().clone(),
431            )
432        };
433        let local_state_store = store.new_local(new_local_options).await;
434
435        // If state table has versioning, that means it supports
436        // Schema change. In that case, the row encoding should be column aware as well.
437        // Otherwise both will be false.
438        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
439        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
440        assert_eq!(
441            table_catalog.version.is_some(),
442            row_serde.kind().is_column_aware()
443        );
444
445        // Restore persisted table watermark.
446        let watermark_serde = if pk_indices.is_empty() {
447            None
448        } else {
449            match table_catalog.clean_watermark_index_in_pk {
450                None => Some(pk_serde.index(0)),
451                Some(clean_watermark_index_in_pk) => {
452                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
453                }
454            }
455        };
456        let max_watermark_of_vnodes = distribution
457            .vnodes()
458            .iter_vnodes()
459            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
460            .max();
461        let committed_watermark = if let Some(deser) = watermark_serde
462            && let Some(max_watermark) = max_watermark_of_vnodes
463        {
464            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
465                assert!(row.len() == 1);
466                row[0].clone()
467            });
468            if deserialized.is_none() {
469                tracing::error!(
470                    vnodes = ?distribution.vnodes(),
471                    watermark = ?max_watermark,
472                    "Failed to deserialize persisted watermark from state store.",
473                );
474            }
475            deserialized
476        } else {
477            None
478        };
479
480        let watermark_cache = if USE_WATERMARK_CACHE {
481            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
482        } else {
483            StateTableWatermarkCache::new(0)
484        };
485
486        // Get info for replicated state table.
487        let output_column_ids_to_input_idx = output_column_ids
488            .iter()
489            .enumerate()
490            .map(|(pos, id)| (*id, pos))
491            .collect::<HashMap<_, _>>();
492
493        // Compute column descriptions
494        let columns: Vec<ColumnDesc> = table_catalog
495            .columns
496            .iter()
497            .map(|c| c.column_desc.as_ref().unwrap().into())
498            .collect_vec();
499
500        // Compute i2o mapping
501        // Note that this can be a partial mapping, since we use the i2o mapping to get
502        // any 1 of the output columns, and use that to fill the input column.
503        let mut i2o_mapping = vec![None; columns.len()];
504        for (i, column) in columns.iter().enumerate() {
505            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
506                i2o_mapping[i] = Some(*pos);
507            }
508        }
509        // We can prune any duplicate column indices
510        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
511
512        // Compute output indices
513        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
514
515        Self {
516            table_id,
517            local_store: local_state_store,
518            store,
519            epoch: None,
520            pk_serde,
521            row_serde,
522            pk_indices,
523            distribution,
524            prefix_hint_len,
525            table_option,
526            value_indices,
527            pending_watermark: None,
528            committed_watermark,
529            watermark_cache,
530            data_types,
531            output_indices,
532            i2o_mapping,
533            op_consistency_level: state_table_op_consistency_level,
534            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
535            on_post_commit: false,
536        }
537    }
538
539    pub fn get_data_types(&self) -> &[DataType] {
540        &self.data_types
541    }
542
543    pub fn table_id(&self) -> u32 {
544        self.table_id.table_id
545    }
546
547    /// Get the vnode value with given (prefix of) primary key
548    fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
549        self.distribution
550            .try_compute_vnode_by_pk_prefix(pk_prefix)
551            .expect("For streaming, the given prefix must be enough to calculate the vnode")
552    }
553
554    /// Get the vnode value of the given primary key
555    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
556        self.distribution.compute_vnode_by_pk(pk)
557    }
558
559    /// NOTE(kwannoel): This is used by backfill.
560    /// We want to check pk indices of upstream table.
561    pub fn pk_indices(&self) -> &[usize] {
562        &self.pk_indices
563    }
564
565    /// Get the indices of the primary key columns in the output columns.
566    ///
567    /// Returns `None` if any of the primary key columns is not in the output columns.
568    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
569        assert!(IS_REPLICATED);
570        self.pk_indices
571            .iter()
572            .map(|&i| self.output_indices.iter().position(|&j| i == j))
573            .collect()
574    }
575
576    pub fn pk_serde(&self) -> &OrderedRowSerde {
577        &self.pk_serde
578    }
579
580    pub fn vnodes(&self) -> &Arc<Bitmap> {
581        self.distribution.vnodes()
582    }
583
584    pub fn value_indices(&self) -> &Option<Vec<usize>> {
585        &self.value_indices
586    }
587
588    pub fn is_consistent_op(&self) -> bool {
589        matches!(
590            self.op_consistency_level,
591            StateTableOpConsistencyLevel::ConsistentOldValue
592                | StateTableOpConsistencyLevel::LogStoreEnabled
593        )
594    }
595}
596
597impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
598where
599    S: StateStore,
600    SD: ValueRowSerde,
601{
602    /// Create replicated state table from table catalog with output indices
603    pub async fn from_table_catalog_with_output_column_ids(
604        table_catalog: &Table,
605        store: S,
606        vnodes: Option<Arc<Bitmap>>,
607        output_column_ids: Vec<ColumnId>,
608    ) -> Self {
609        Self::from_table_catalog_inner(
610            table_catalog,
611            store,
612            vnodes,
613            StateTableOpConsistencyLevel::Inconsistent,
614            output_column_ids,
615        )
616        .await
617    }
618}
619
620// point get
621impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
622    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
623where
624    S: StateStore,
625    SD: ValueRowSerde,
626{
627    /// Get a single row from state table.
628    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
629        // TODO: avoid clone when `on_key_value_fn` can be non-static
630        let row_serde = self.row_serde.clone();
631        let row = self
632            .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
633            .await?;
634        match row {
635            Some(row) => {
636                if IS_REPLICATED {
637                    // If the table is replicated, we need to deserialize the row with the output
638                    // indices.
639                    let row = row.project(&self.output_indices);
640                    Ok(Some(row.into_owned_row()))
641                } else {
642                    Ok(Some(OwnedRow::new(row)))
643                }
644            }
645            None => Ok(None),
646        }
647    }
648
649    /// Get a raw encoded row from state table.
650    pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
651        self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
652            .await
653    }
654
655    async fn get_inner<O: Send + 'static>(
656        &self,
657        pk: impl Row,
658        on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
659    ) -> StreamExecutorResult<Option<O>> {
660        assert!(pk.len() <= self.pk_indices.len());
661
662        let serialized_pk =
663            serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
664
665        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
666            Some(serialized_pk.slice(VirtualNode::SIZE..))
667        } else {
668            #[cfg(debug_assertions)]
669            if self.prefix_hint_len != 0 {
670                warn!(
671                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
672                );
673            }
674            None
675        };
676
677        let read_options = ReadOptions {
678            prefix_hint,
679            retention_seconds: self.table_option.retention_seconds,
680            cache_policy: CachePolicy::Fill(Hint::Normal),
681            ..Default::default()
682        };
683
684        self.local_store
685            .on_key_value(serialized_pk, read_options, on_key_value_fn)
686            .await
687            .map_err(Into::into)
688    }
689}
690
691/// A callback struct returned from [`StateTableInner::commit`].
692///
693/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
694/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
695/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
696///
697/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
698/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
699/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
700/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
701/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
702/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
703/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
704/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
705#[must_use]
706pub struct StateTablePostCommit<
707    'a,
708    S,
709    SD = BasicSerde,
710    const IS_REPLICATED: bool = false,
711    const USE_WATERMARK_CACHE: bool = false,
712> where
713    S: StateStore,
714    SD: ValueRowSerde,
715{
716    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
717}
718
719impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
720    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
721where
722    S: StateStore,
723    SD: ValueRowSerde,
724{
725    pub async fn post_yield_barrier(
726        mut self,
727        new_vnodes: Option<Arc<Bitmap>>,
728    ) -> StreamExecutorResult<
729        Option<(
730            (
731                Arc<Bitmap>,
732                Arc<Bitmap>,
733                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
734            ),
735            bool,
736        )>,
737    > {
738        self.inner.on_post_commit = false;
739        Ok(if let Some(new_vnodes) = new_vnodes {
740            let (old_vnodes, cache_may_stale) =
741                self.update_vnode_bitmap(new_vnodes.clone()).await?;
742            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
743        } else {
744            None
745        })
746    }
747
748    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
749        &*self.inner
750    }
751
752    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
753    async fn update_vnode_bitmap(
754        &mut self,
755        new_vnodes: Arc<Bitmap>,
756    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
757        let prev_vnodes = self
758            .inner
759            .local_store
760            .update_vnode_bitmap(new_vnodes.clone())
761            .await?;
762        assert_eq!(
763            &prev_vnodes,
764            self.inner.vnodes(),
765            "state table and state store vnode bitmap mismatches"
766        );
767
768        if self.inner.distribution.is_singleton() {
769            assert_eq!(
770                &new_vnodes,
771                self.inner.vnodes(),
772                "should not update vnode bitmap for singleton table"
773            );
774        }
775        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
776
777        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
778
779        if cache_may_stale {
780            self.inner.pending_watermark = None;
781            if USE_WATERMARK_CACHE {
782                self.inner.watermark_cache.clear();
783            }
784        }
785
786        Ok((
787            self.inner.distribution.update_vnode_bitmap(new_vnodes),
788            cache_may_stale,
789        ))
790    }
791}
792
793// write
794impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
795    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
796where
797    S: StateStore,
798    SD: ValueRowSerde,
799{
800    fn handle_mem_table_error(&self, e: StorageError) {
801        let e = match e.into_inner() {
802            ErrorKind::MemTable(e) => e,
803            _ => unreachable!("should only get memtable error"),
804        };
805        match *e {
806            MemTableError::InconsistentOperation { key, prev, new } => {
807                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
808                panic!(
809                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
810                    self.table_id(),
811                    vnode,
812                    &key,
813                    prev.debug_fmt(&*self.row_serde),
814                    new.debug_fmt(&*self.row_serde),
815                )
816            }
817        }
818    }
819
820    fn serialize_value(&self, value: impl Row) -> Bytes {
821        if let Some(value_indices) = self.value_indices.as_ref() {
822            self.row_serde
823                .serialize(value.project(value_indices))
824                .into()
825        } else {
826            self.row_serde.serialize(value).into()
827        }
828    }
829
830    fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
831        insane_mode_discard_point!();
832        self.local_store
833            .insert(key, value_bytes, None)
834            .unwrap_or_else(|e| self.handle_mem_table_error(e));
835    }
836
837    fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
838        insane_mode_discard_point!();
839        self.local_store
840            .delete(key, value_bytes)
841            .unwrap_or_else(|e| self.handle_mem_table_error(e));
842    }
843
844    fn update_inner(
845        &mut self,
846        key_bytes: TableKey<Bytes>,
847        old_value_bytes: Option<Bytes>,
848        new_value_bytes: Bytes,
849    ) {
850        insane_mode_discard_point!();
851        self.local_store
852            .insert(key_bytes, new_value_bytes, old_value_bytes)
853            .unwrap_or_else(|e| self.handle_mem_table_error(e));
854    }
855
856    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
857    /// the table.
858    pub fn insert(&mut self, value: impl Row) {
859        let pk_indices = &self.pk_indices;
860        let pk = (&value).project(pk_indices);
861        if USE_WATERMARK_CACHE {
862            self.watermark_cache.insert(&pk);
863        }
864
865        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
866        let value_bytes = self.serialize_value(value);
867        self.insert_inner(key_bytes, value_bytes);
868    }
869
870    /// Delete a row from state table. Must provide a full row of old value corresponding to the
871    /// column desc of the table.
872    pub fn delete(&mut self, old_value: impl Row) {
873        let pk_indices = &self.pk_indices;
874        let pk = (&old_value).project(pk_indices);
875        if USE_WATERMARK_CACHE {
876            self.watermark_cache.delete(&pk);
877        }
878
879        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
880        let value_bytes = self.serialize_value(old_value);
881        self.delete_inner(key_bytes, value_bytes);
882    }
883
884    /// Update a row. The old and new value should have the same pk.
885    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
886        let old_pk = (&old_value).project(self.pk_indices());
887        let new_pk = (&new_value).project(self.pk_indices());
888        debug_assert!(
889            Row::eq(&old_pk, new_pk),
890            "pk should not change: {old_pk:?} vs {new_pk:?}",
891        );
892
893        let new_key_bytes =
894            serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
895        let old_value_bytes = self.serialize_value(old_value);
896        let new_value_bytes = self.serialize_value(new_value);
897
898        self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
899    }
900
901    /// Write a record into state table. Must have the same schema with the table.
902    pub fn write_record(&mut self, record: Record<impl Row>) {
903        match record {
904            Record::Insert { new_row } => self.insert(new_row),
905            Record::Delete { old_row } => self.delete(old_row),
906            Record::Update { old_row, new_row } => self.update(old_row, new_row),
907        }
908    }
909
910    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
911        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
912    }
913
914    /// Write batch with a `StreamChunk` which should have the same schema with the table.
915    // allow(izip, which use zip instead of zip_eq)
916    #[allow(clippy::disallowed_methods)]
917    pub fn write_chunk(&mut self, chunk: StreamChunk) {
918        let chunk = if IS_REPLICATED {
919            self.fill_non_output_indices(chunk)
920        } else {
921            chunk
922        };
923        let (chunk, op) = chunk.into_parts();
924
925        let vnodes = self
926            .distribution
927            .compute_chunk_vnode(&chunk, &self.pk_indices);
928
929        let values = if let Some(ref value_indices) = self.value_indices {
930            chunk
931                .project(value_indices)
932                .serialize_with(&*self.row_serde)
933        } else {
934            chunk.serialize_with(&*self.row_serde)
935        };
936
937        // TODO(kwannoel): Seems like we are doing vis check twice here.
938        // Once below, when using vis, and once here,
939        // when using vis to set rows empty or not.
940        // If we are to use the vis optimization, we should skip this.
941        let key_chunk = chunk.project(self.pk_indices());
942        let vnode_and_pks = key_chunk
943            .rows_with_holes()
944            .zip_eq_fast(vnodes.iter())
945            .map(|(r, vnode)| {
946                let mut buffer = BytesMut::new();
947                buffer.put_slice(&vnode.to_be_bytes()[..]);
948                if let Some(r) = r {
949                    self.pk_serde.serialize(r, &mut buffer);
950                }
951                (r, buffer.freeze())
952            })
953            .collect_vec();
954
955        if !key_chunk.is_compacted() {
956            for ((op, (key, key_bytes), value), vis) in
957                izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
958            {
959                if vis {
960                    match op {
961                        Op::Insert | Op::UpdateInsert => {
962                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
963                                self.watermark_cache.insert(pk);
964                            }
965                            self.insert_inner(TableKey(key_bytes), value);
966                        }
967                        Op::Delete | Op::UpdateDelete => {
968                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
969                                self.watermark_cache.delete(pk);
970                            }
971                            self.delete_inner(TableKey(key_bytes), value);
972                        }
973                    }
974                }
975            }
976        } else {
977            for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
978                match op {
979                    Op::Insert | Op::UpdateInsert => {
980                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
981                            self.watermark_cache.insert(pk);
982                        }
983                        self.insert_inner(TableKey(key_bytes), value);
984                    }
985                    Op::Delete | Op::UpdateDelete => {
986                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
987                            self.watermark_cache.delete(pk);
988                        }
989                        self.delete_inner(TableKey(key_bytes), value);
990                    }
991                }
992            }
993        }
994    }
995
996    /// Update watermark for state cleaning.
997    ///
998    /// # Arguments
999    ///
1000    /// * `watermark` - Latest watermark received.
1001    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1002        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1003        self.pending_watermark = Some(watermark);
1004    }
1005
1006    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1007    /// table through `update_watermark` method.
1008    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1009        self.committed_watermark.as_ref()
1010    }
1011
1012    pub async fn commit(
1013        &mut self,
1014        new_epoch: EpochPair,
1015    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1016    {
1017        self.commit_inner(new_epoch, None).await
1018    }
1019
1020    #[cfg(test)]
1021    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1022        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1023    }
1024
1025    pub async fn commit_assert_no_update_vnode_bitmap(
1026        &mut self,
1027        new_epoch: EpochPair,
1028    ) -> StreamExecutorResult<()> {
1029        let post_commit = self.commit_inner(new_epoch, None).await?;
1030        post_commit.post_yield_barrier(None).await?;
1031        Ok(())
1032    }
1033
1034    pub async fn commit_may_switch_consistent_op(
1035        &mut self,
1036        new_epoch: EpochPair,
1037        op_consistency_level: StateTableOpConsistencyLevel,
1038    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1039    {
1040        if self.op_consistency_level != op_consistency_level {
1041            info!(
1042                ?new_epoch,
1043                prev_op_consistency_level = ?self.op_consistency_level,
1044                ?op_consistency_level,
1045                table_id = self.table_id.table_id,
1046                "switch to new op consistency level"
1047            );
1048            self.commit_inner(new_epoch, Some(op_consistency_level))
1049                .await
1050        } else {
1051            self.commit_inner(new_epoch, None).await
1052        }
1053    }
1054
1055    async fn commit_inner(
1056        &mut self,
1057        new_epoch: EpochPair,
1058        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1059    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1060    {
1061        assert!(!self.on_post_commit);
1062        assert_eq!(
1063            self.epoch.expect("should only be called after init").curr,
1064            new_epoch.prev
1065        );
1066        let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1067            assert_ne!(self.op_consistency_level, new_consistency_level);
1068            self.op_consistency_level = new_consistency_level;
1069            match new_consistency_level {
1070                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1071                StateTableOpConsistencyLevel::ConsistentOldValue => {
1072                    consistent_old_value_op(self.row_serde.clone(), false)
1073                }
1074                StateTableOpConsistencyLevel::LogStoreEnabled => {
1075                    consistent_old_value_op(self.row_serde.clone(), true)
1076                }
1077            }
1078        });
1079        trace!(
1080            table_id = %self.table_id,
1081            epoch = ?self.epoch,
1082            "commit state table"
1083        );
1084
1085        self.local_store
1086            .flush()
1087            .instrument(tracing::info_span!("state_table_flush"))
1088            .await?;
1089        let table_watermarks = self.commit_pending_watermark();
1090        self.local_store.seal_current_epoch(
1091            new_epoch.curr,
1092            SealCurrentEpochOptions {
1093                table_watermarks,
1094                switch_op_consistency_level,
1095            },
1096        );
1097        self.epoch = Some(new_epoch);
1098
1099        // Refresh watermark cache if it is out of sync.
1100        if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
1101            if let Some(ref watermark) = self.committed_watermark {
1102                let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1103                    (Included(once(Some(watermark.clone()))), Unbounded);
1104                // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1105                // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1106                // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1107                // TODO(kwannoel): We can optimize it with:
1108                // 1. Either use `RefCell`.
1109                // 2. Or pass in a direct reference to LocalStateStore,
1110                //    instead of referencing it indirectly from `self`.
1111                //    Similar to how we do for pk_indices.
1112                let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1113                {
1114                    let mut streams = vec![];
1115                    for vnode in self.vnodes().iter_vnodes() {
1116                        let stream = self
1117                            .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1118                            .await?;
1119                        streams.push(Box::pin(stream));
1120                    }
1121                    let merged_stream = merge_sort(streams);
1122                    pin_mut!(merged_stream);
1123
1124                    #[for_await]
1125                    for entry in merged_stream.take(self.watermark_cache.capacity()) {
1126                        let keyed_row = entry?;
1127                        let pk = self.pk_serde.deserialize(keyed_row.key())?;
1128                        // watermark column should be part of the pk
1129                        if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1130                            pks.push(pk);
1131                        }
1132                    }
1133                }
1134
1135                let mut filler = self.watermark_cache.begin_syncing();
1136                for pk in pks {
1137                    filler.insert_unchecked(DefaultOrdered(pk), ());
1138                }
1139                filler.finish();
1140
1141                let n_cache_entries = self.watermark_cache.len();
1142                if n_cache_entries < self.watermark_cache.capacity() {
1143                    self.watermark_cache.set_table_row_count(n_cache_entries);
1144                }
1145            }
1146        }
1147
1148        self.on_post_commit = true;
1149        Ok(StateTablePostCommit { inner: self })
1150    }
1151
1152    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1153    fn commit_pending_watermark(
1154        &mut self,
1155    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1156        let watermark = self.pending_watermark.take()?;
1157        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1158
1159        assert!(
1160            !self.pk_indices().is_empty(),
1161            "see pending watermark on empty pk"
1162        );
1163        let watermark_serializer = {
1164            match self.clean_watermark_index_in_pk {
1165                None => self.pk_serde.index(0),
1166                Some(clean_watermark_index_in_pk) => {
1167                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1168                }
1169            }
1170        };
1171
1172        let watermark_type = match self.clean_watermark_index_in_pk {
1173            None => WatermarkSerdeType::PkPrefix,
1174            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1175                0 => WatermarkSerdeType::PkPrefix,
1176                _ => WatermarkSerdeType::NonPkPrefix,
1177            },
1178        };
1179
1180        let should_clean_watermark = {
1181            {
1182                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1183                    if let Some(key) = self.watermark_cache.lowest_key() {
1184                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1185                    } else {
1186                        // Watermark cache is synced,
1187                        // And there's no key in watermark cache.
1188                        // That implies table is empty.
1189                        // We should not clean watermark.
1190                        false
1191                    }
1192                } else {
1193                    // Either we are not using watermark cache,
1194                    // Or watermark_cache is not synced.
1195                    // In either case we should clean watermark.
1196                    true
1197                }
1198            }
1199        };
1200
1201        let watermark_suffix =
1202            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1203
1204        // Compute Delete Ranges
1205        let seal_watermark = if should_clean_watermark {
1206            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1207                self.vnodes().iter_vnodes().collect_vec()
1208            }, "delete range");
1209
1210            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1211
1212            if order_type.is_ascending() {
1213                Some((
1214                    WatermarkDirection::Ascending,
1215                    VnodeWatermark::new(
1216                        self.vnodes().clone(),
1217                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1218                    ),
1219                    watermark_type,
1220                ))
1221            } else {
1222                Some((
1223                    WatermarkDirection::Descending,
1224                    VnodeWatermark::new(
1225                        self.vnodes().clone(),
1226                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1227                    ),
1228                    watermark_type,
1229                ))
1230            }
1231        } else {
1232            None
1233        };
1234        self.committed_watermark = Some(watermark);
1235
1236        // Clear the watermark cache and force a resync.
1237        // TODO(kwannoel): This can be further optimized:
1238        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1239        //    up to the largest end of delete ranges.
1240        // 2. Mark the cache as not_synced, so we can still refill it later.
1241        // 3. When refilling the cache,
1242        //    we just refill from the largest value of the cache, as the lower bound.
1243        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1244            self.watermark_cache.clear();
1245        }
1246
1247        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1248            (direction, vec![watermark], is_non_pk_prefix)
1249        })
1250    }
1251
1252    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1253        self.local_store.try_flush().await?;
1254        Ok(())
1255    }
1256}
1257
1258pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1259pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1260pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1261
1262// Iterator functions
1263impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1264    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1265where
1266    S: StateStore,
1267    SD: ValueRowSerde,
1268{
1269    /// This function scans rows from the relational table with specific `pk_range` under the same
1270    /// `vnode`.
1271    pub async fn iter_with_vnode(
1272        &self,
1273
1274        // Optional vnode that returns an iterator only over the given range under that vnode.
1275        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1276        // iterate over each vnode that the `StateTableInner` owns.
1277        vnode: VirtualNode,
1278        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1279        prefetch_options: PrefetchOptions,
1280    ) -> StreamExecutorResult<impl RowStream<'_>> {
1281        Ok(deserialize_keyed_row_stream::<'_, ()>(
1282            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1283                .await?,
1284            &*self.row_serde,
1285        )
1286        .map_ok(|(_, row)| row))
1287    }
1288
1289    pub async fn iter_keyed_row_with_vnode(
1290        &self,
1291        vnode: VirtualNode,
1292        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1293        prefetch_options: PrefetchOptions,
1294    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1295        Ok(deserialize_keyed_row_stream(
1296            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1297                .await?,
1298            &*self.row_serde,
1299        )
1300        .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1301    }
1302
1303    pub async fn iter_with_vnode_and_output_indices(
1304        &self,
1305        vnode: VirtualNode,
1306        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1307        prefetch_options: PrefetchOptions,
1308    ) -> StreamExecutorResult<impl RowStream<'_>> {
1309        assert!(IS_REPLICATED);
1310        let stream = self
1311            .iter_with_vnode(vnode, pk_range, prefetch_options)
1312            .await?;
1313        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1314    }
1315
1316    async fn iter_kv(
1317        &self,
1318        table_key_range: TableKeyRange,
1319        prefix_hint: Option<Bytes>,
1320        prefetch_options: PrefetchOptions,
1321    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1322        let read_options = ReadOptions {
1323            prefix_hint,
1324            retention_seconds: self.table_option.retention_seconds,
1325            prefetch_options,
1326            cache_policy: CachePolicy::Fill(Hint::Normal),
1327        };
1328
1329        Ok(self.local_store.iter(table_key_range, read_options).await?)
1330    }
1331
1332    async fn rev_iter_kv(
1333        &self,
1334        table_key_range: TableKeyRange,
1335        prefix_hint: Option<Bytes>,
1336        prefetch_options: PrefetchOptions,
1337    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1338        let read_options = ReadOptions {
1339            prefix_hint,
1340            retention_seconds: self.table_option.retention_seconds,
1341            prefetch_options,
1342            cache_policy: CachePolicy::Fill(Hint::Normal),
1343        };
1344
1345        Ok(self
1346            .local_store
1347            .rev_iter(table_key_range, read_options)
1348            .await?)
1349    }
1350
1351    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1352    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1353    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1354    pub async fn iter_with_prefix(
1355        &self,
1356        pk_prefix: impl Row,
1357        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1358        prefetch_options: PrefetchOptions,
1359    ) -> StreamExecutorResult<impl RowStream<'_>> {
1360        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1361            .await?;
1362        Ok(stream.map_ok(|(_, row)| row))
1363    }
1364
1365    /// Get the row from a state table with only 1 row.
1366    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1367        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1368        let stream = self
1369            .iter_with_prefix(row::empty(), sub_range, Default::default())
1370            .await?;
1371        pin_mut!(stream);
1372
1373        if let Some(res) = stream.next().await {
1374            let value = res?.into_owned_row();
1375            assert!(stream.next().await.is_none());
1376            Ok(Some(value))
1377        } else {
1378            Ok(None)
1379        }
1380    }
1381
1382    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1383    ///
1384    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1385    /// which does not matter in the use case.
1386    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1387        Ok(self
1388            .get_from_one_row_table()
1389            .await?
1390            .and_then(|row| row[0].clone()))
1391    }
1392
1393    pub async fn iter_keyed_row_with_prefix(
1394        &self,
1395        pk_prefix: impl Row,
1396        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1397        prefetch_options: PrefetchOptions,
1398    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1399        Ok(
1400            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1401            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1402        )
1403    }
1404
1405    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1406    pub async fn rev_iter_with_prefix(
1407        &self,
1408        pk_prefix: impl Row,
1409        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1410        prefetch_options: PrefetchOptions,
1411    ) -> StreamExecutorResult<impl RowStream<'_>> {
1412        Ok(
1413            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1414            .await?.map_ok(|(_, row)| row),
1415        )
1416    }
1417
1418    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1419        &self,
1420        pk_prefix: impl Row,
1421        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1422        prefetch_options: PrefetchOptions,
1423    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1424        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1425        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1426
1427        // We assume that all usages of iterating the state table only access a single vnode.
1428        // If this assertion fails, then something must be wrong with the operator implementation or
1429        // the distribution derivation from the optimizer.
1430        let vnode = self.compute_prefix_vnode(&pk_prefix);
1431
1432        // Construct prefix hint for prefix bloom filter.
1433        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1434        if self.prefix_hint_len != 0 {
1435            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1436        }
1437        let prefix_hint = {
1438            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1439                None
1440            } else {
1441                let encoded_prefix_len = self
1442                    .pk_serde
1443                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1444
1445                Some(Bytes::copy_from_slice(
1446                    &encoded_prefix[..encoded_prefix_len],
1447                ))
1448            }
1449        };
1450
1451        trace!(
1452            table_id = %self.table_id(),
1453            ?prefix_hint, ?pk_prefix,
1454            ?pk_prefix_indices,
1455            iter_direction = if REVERSE { "reverse" } else { "forward" },
1456            "storage_iter_with_prefix"
1457        );
1458
1459        let memcomparable_range =
1460            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1461
1462        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1463
1464        Ok(if REVERSE {
1465            futures::future::Either::Left(deserialize_keyed_row_stream(
1466                self.rev_iter_kv(
1467                    memcomparable_range_with_vnode,
1468                    prefix_hint,
1469                    prefetch_options,
1470                )
1471                .await?,
1472                &*self.row_serde,
1473            ))
1474        } else {
1475            futures::future::Either::Right(deserialize_keyed_row_stream(
1476                self.iter_kv(
1477                    memcomparable_range_with_vnode,
1478                    prefix_hint,
1479                    prefetch_options,
1480                )
1481                .await?,
1482                &*self.row_serde,
1483            ))
1484        })
1485    }
1486
1487    /// This function scans raw key-values from the relational table with specific `pk_range` under
1488    /// the same `vnode`.
1489    async fn iter_kv_with_pk_range(
1490        &self,
1491        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1492        // Optional vnode that returns an iterator only over the given range under that vnode.
1493        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1494        // iterate over each vnode that the `StateTableInner` owns.
1495        vnode: VirtualNode,
1496        prefetch_options: PrefetchOptions,
1497    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1498        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1499        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1500
1501        // TODO: provide a trace of useful params.
1502        self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1503            .await
1504    }
1505
1506    #[cfg(test)]
1507    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1508        &self.watermark_cache
1509    }
1510}
1511
1512impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1513    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1514where
1515    S: StateStore,
1516    SD: ValueRowSerde,
1517{
1518    pub async fn iter_log_with_vnode(
1519        &self,
1520        vnode: VirtualNode,
1521        epoch_range: (u64, u64),
1522        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1523    ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1524        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1525        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1526        Ok(deserialize_log_stream(
1527            self.store
1528                .iter_log(
1529                    epoch_range,
1530                    memcomparable_range_with_vnode,
1531                    ReadLogOptions {
1532                        table_id: self.table_id,
1533                    },
1534                )
1535                .await?,
1536            &*self.row_serde,
1537        )
1538        .map_err(Into::into))
1539    }
1540}
1541
1542fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1543    iter: impl StateStoreIter + 'a,
1544    deserializer: &'a impl ValueRowSerde,
1545) -> impl PkRowStream<'a, K> {
1546    iter.into_stream(move |(key, value)| {
1547        Ok((
1548            K::copy_from_slice(key.user_key.table_key.as_ref()),
1549            deserializer.deserialize(value).map(OwnedRow::new)?,
1550        ))
1551    })
1552    .map_err(Into::into)
1553}
1554
1555pub fn prefix_range_to_memcomparable(
1556    pk_serde: &OrderedRowSerde,
1557    range: &(Bound<impl Row>, Bound<impl Row>),
1558) -> (Bound<Bytes>, Bound<Bytes>) {
1559    (
1560        start_range_to_memcomparable(pk_serde, &range.0),
1561        end_range_to_memcomparable(pk_serde, &range.1, None),
1562    )
1563}
1564
1565fn prefix_and_sub_range_to_memcomparable(
1566    pk_serde: &OrderedRowSerde,
1567    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1568    pk_prefix: impl Row,
1569) -> (Bound<Bytes>, Bound<Bytes>) {
1570    let (range_start, range_end) = sub_range;
1571    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1572    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1573    let start_range = match range_start {
1574        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1575        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1576        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1577    };
1578    let end_range = match range_end {
1579        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1580        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1581        Unbounded => Unbounded,
1582    };
1583    (
1584        start_range_to_memcomparable(pk_serde, &start_range),
1585        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1586    )
1587}
1588
1589fn start_range_to_memcomparable<R: Row>(
1590    pk_serde: &OrderedRowSerde,
1591    bound: &Bound<R>,
1592) -> Bound<Bytes> {
1593    let serialize_pk_prefix = |pk_prefix: &R| {
1594        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1595        serialize_pk(pk_prefix, &prefix_serializer)
1596    };
1597    match bound {
1598        Unbounded => Unbounded,
1599        Included(r) => {
1600            let serialized = serialize_pk_prefix(r);
1601
1602            Included(serialized)
1603        }
1604        Excluded(r) => {
1605            let serialized = serialize_pk_prefix(r);
1606
1607            start_bound_of_excluded_prefix(&serialized)
1608        }
1609    }
1610}
1611
1612fn end_range_to_memcomparable<R: Row>(
1613    pk_serde: &OrderedRowSerde,
1614    bound: &Bound<R>,
1615    serialized_pk_prefix: Option<Bytes>,
1616) -> Bound<Bytes> {
1617    let serialize_pk_prefix = |pk_prefix: &R| {
1618        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1619        serialize_pk(pk_prefix, &prefix_serializer)
1620    };
1621    match bound {
1622        Unbounded => match serialized_pk_prefix {
1623            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1624            None => Unbounded,
1625        },
1626        Included(r) => {
1627            let serialized = serialize_pk_prefix(r);
1628
1629            end_bound_of_prefix(&serialized)
1630        }
1631        Excluded(r) => {
1632            let serialized = serialize_pk_prefix(r);
1633            Excluded(serialized)
1634        }
1635    }
1636}
1637
1638fn fill_non_output_indices(
1639    i2o_mapping: &ColIndexMapping,
1640    data_types: &[DataType],
1641    chunk: StreamChunk,
1642) -> StreamChunk {
1643    let cardinality = chunk.cardinality();
1644    let (ops, columns, vis) = chunk.into_inner();
1645    let mut full_columns = Vec::with_capacity(data_types.len());
1646    for (i, data_type) in data_types.iter().enumerate() {
1647        if let Some(j) = i2o_mapping.try_map(i) {
1648            full_columns.push(columns[j].clone());
1649        } else {
1650            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1651            column_builder.append_n_null(cardinality);
1652            let column: ArrayRef = column_builder.finish().into();
1653            full_columns.push(column)
1654        }
1655    }
1656    let data_chunk = DataChunk::new(full_columns, vis);
1657    StreamChunk::from_parts(ops, data_chunk)
1658}
1659
1660#[cfg(test)]
1661mod tests {
1662    use std::fmt::Debug;
1663
1664    use expect_test::{Expect, expect};
1665
1666    use super::*;
1667
1668    fn check(actual: impl Debug, expect: Expect) {
1669        let actual = format!("{:#?}", actual);
1670        expect.assert_eq(&actual);
1671    }
1672
1673    #[test]
1674    fn test_fill_non_output_indices() {
1675        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1676        let replicated_chunk = [OwnedRow::new(vec![
1677            Some(222_i32.into()),
1678            Some(2_i32.into()),
1679        ])];
1680        let replicated_chunk = StreamChunk::from_parts(
1681            vec![Op::Insert],
1682            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1683        );
1684        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1685        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1686        check(
1687            filled_chunk,
1688            expect![[r#"
1689            StreamChunk { cardinality: 1, capacity: 1, data:
1690            +---+---+---+-----+
1691            | + | 2 |   | 222 |
1692            +---+---+---+-----+
1693             }"#]],
1694        );
1695    }
1696}