risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43    CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44    start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62    ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72/// Mostly watermark operators will have inserts (append-only).
73/// So this number should not need to be very large.
74/// But we may want to improve this choice in the future.
75const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77/// This macro is used to mark a point where we want to randomly discard the operation and early
78/// return, only in insane mode.
79macro_rules! insane_mode_discard_point {
80    () => {{
81        use rand::Rng;
82        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83            return;
84        }
85    }};
86}
87
88/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
89/// row-based encoding.
90#[derive(Clone)]
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    local_store: S::Local,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Row deserializer with value encoding
116    row_serde: Arc<SD>,
117
118    /// Indices of primary key.
119    /// Note that the index is based on the all columns of the table, instead of the output ones.
120    // FIXME: revisit constructions and usages.
121    pk_indices: Vec<usize>,
122
123    /// Distribution of the state table.
124    ///
125    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
126    /// executor. The table will also check whether the written rows
127    /// conform to this partition.
128    distribution: TableDistribution,
129
130    prefix_hint_len: usize,
131
132    /// Used for catalog `table_properties`
133    table_option: TableOption,
134
135    value_indices: Option<Vec<usize>>,
136
137    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
138    pending_watermark: Option<ScalarImpl>,
139    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
140    committed_watermark: Option<ScalarImpl>,
141    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
142    watermark_cache: StateTableWatermarkCache,
143
144    /// Data Types
145    /// We will need to use to build data chunks from state table rows.
146    data_types: Vec<DataType>,
147
148    /// "i" here refers to the base `state_table`'s actual schema.
149    /// "o" here refers to the replicated state table's output schema.
150    /// This mapping is used to reconstruct a row being written from replicated state table.
151    /// Such that the schema of this row will match the full schema of the base state table.
152    /// It is only applicable for replication.
153    i2o_mapping: ColIndexMapping,
154
155    /// Output indices
156    /// Used for:
157    /// 1. Computing `output_value_indices` to ser/de replicated rows.
158    /// 2. Computing output pk indices to used them for backfill state.
159    output_indices: Vec<usize>,
160
161    op_consistency_level: StateTableOpConsistencyLevel,
162
163    clean_watermark_index_in_pk: Option<i32>,
164
165    /// Flag to indicate whether the state table has called `commit`, but has not called
166    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
167    on_post_commit: bool,
168}
169
170/// `StateTable` will use `BasicSerde` as default
171pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
173/// Used for `ArrangementBackfill` executor.
174pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175/// `WatermarkCacheStateTable` caches the watermark column.
176/// It will reduce state cleaning overhead.
177pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181// initialize
182impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185    S: StateStore,
186    SD: ValueRowSerde,
187{
188    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
189    /// and otherwise, deadlock can be likely to happen.
190    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191        self.local_store.init(InitOptions::new(epoch)).await?;
192        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193        Ok(())
194    }
195
196    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197        self.store
198            .try_wait_epoch(
199                HummockReadEpoch::Committed(prev_epoch),
200                TryWaitEpochOptions {
201                    table_id: self.table_id,
202                },
203            )
204            .await
205    }
206
207    pub fn state_store(&self) -> &S {
208        &self.store
209    }
210}
211
212fn consistent_old_value_op(
213    row_serde: Arc<impl ValueRowSerde>,
214    is_log_store: bool,
215) -> OpConsistencyLevel {
216    OpConsistencyLevel::ConsistentOldValue {
217        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218            if first == second {
219                return true;
220            }
221            let first = match row_serde.deserialize(first) {
222                Ok(rows) => rows,
223                Err(e) => {
224                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225                    return false;
226                }
227            };
228            let second = match row_serde.deserialize(second) {
229                Ok(rows) => rows,
230                Err(e) => {
231                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232                    return false;
233                }
234            };
235            if first != second {
236                error!(first = ?first, second = ?second, "sanity check fail");
237                false
238            } else {
239                true
240            }
241        }),
242        is_log_store,
243    }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248    /// Op is inconsistent
249    Inconsistent,
250    /// Op is consistent.
251    /// - Insert op should ensure that the key does not exist previously
252    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
253    ConsistentOldValue,
254    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
255    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
256    LogStoreEnabled,
257}
258
259// initialize
260// FIXME(kwannoel): Enforce that none of the constructors here
261// should be used by replicated state table.
262// Apart from from_table_catalog_inner.
263impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266    S: StateStore,
267    SD: ValueRowSerde,
268{
269    /// Create state table from table catalog and store.
270    ///
271    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
272    pub async fn from_table_catalog(
273        table_catalog: &Table,
274        store: S,
275        vnodes: Option<Arc<Bitmap>>,
276    ) -> Self {
277        Self::from_table_catalog_with_consistency_level(
278            table_catalog,
279            store,
280            vnodes,
281            StateTableOpConsistencyLevel::ConsistentOldValue,
282        )
283        .await
284    }
285
286    /// Create state table from table catalog and store with sanity check disabled.
287    pub async fn from_table_catalog_inconsistent_op(
288        table_catalog: &Table,
289        store: S,
290        vnodes: Option<Arc<Bitmap>>,
291    ) -> Self {
292        Self::from_table_catalog_with_consistency_level(
293            table_catalog,
294            store,
295            vnodes,
296            StateTableOpConsistencyLevel::Inconsistent,
297        )
298        .await
299    }
300
301    pub async fn from_table_catalog_with_consistency_level(
302        table_catalog: &Table,
303        store: S,
304        vnodes: Option<Arc<Bitmap>>,
305        consistency_level: StateTableOpConsistencyLevel,
306    ) -> Self {
307        Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308            .await
309    }
310
311    /// Create state table from table catalog and store.
312    async fn from_table_catalog_inner(
313        table_catalog: &Table,
314        store: S,
315        vnodes: Option<Arc<Bitmap>>,
316        op_consistency_level: StateTableOpConsistencyLevel,
317        output_column_ids: Vec<ColumnId>,
318    ) -> Self {
319        let table_id = TableId::new(table_catalog.id);
320        let table_columns: Vec<ColumnDesc> = table_catalog
321            .columns
322            .iter()
323            .map(|col| col.column_desc.as_ref().unwrap().into())
324            .collect();
325        let data_types: Vec<DataType> = table_catalog
326            .columns
327            .iter()
328            .map(|col| {
329                col.get_column_desc()
330                    .unwrap()
331                    .get_column_type()
332                    .unwrap()
333                    .into()
334            })
335            .collect();
336        let order_types: Vec<OrderType> = table_catalog
337            .pk
338            .iter()
339            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340            .collect();
341        let dist_key_indices: Vec<usize> = table_catalog
342            .distribution_key
343            .iter()
344            .map(|dist_index| *dist_index as usize)
345            .collect();
346
347        let pk_indices = table_catalog
348            .pk
349            .iter()
350            .map(|col_order| col_order.column_index as usize)
351            .collect_vec();
352
353        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
354        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356        } else {
357            table_catalog
358                .get_dist_key_in_pk()
359                .iter()
360                .map(|idx| *idx as usize)
361                .collect()
362        };
363
364        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365            let vnode_col_idx = *idx as usize;
366            pk_indices.iter().position(|&i| vnode_col_idx == i)
367        });
368
369        let distribution =
370            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371        assert_eq!(
372            distribution.vnode_count(),
373            table_catalog.vnode_count(),
374            "vnode count mismatch, scanning table {} under wrong distribution?",
375            table_catalog.name,
376        );
377
378        let pk_data_types = pk_indices
379            .iter()
380            .map(|i| table_columns[*i].data_type.clone())
381            .collect();
382        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384        let input_value_indices = table_catalog
385            .value_indices
386            .iter()
387            .map(|val| *val as usize)
388            .collect_vec();
389
390        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392        // if value_indices is the no shuffle full columns.
393        let value_indices = match input_value_indices.len() == table_columns.len()
394            && input_value_indices == no_shuffle_value_indices
395        {
396            true => None,
397            false => Some(input_value_indices),
398        };
399        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401        let row_serde = Arc::new(SD::new(
402            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403            Arc::from(table_columns.clone().into_boxed_slice()),
404        ));
405
406        let state_table_op_consistency_level = op_consistency_level;
407        let op_consistency_level = match op_consistency_level {
408            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409            StateTableOpConsistencyLevel::ConsistentOldValue => {
410                consistent_old_value_op(row_serde.clone(), false)
411            }
412            StateTableOpConsistencyLevel::LogStoreEnabled => {
413                consistent_old_value_op(row_serde.clone(), true)
414            }
415        };
416
417        let table_option = TableOption::new(table_catalog.retention_seconds);
418        let new_local_options = if IS_REPLICATED {
419            NewLocalOptions::new_replicated(
420                table_id,
421                op_consistency_level,
422                table_option,
423                distribution.vnodes().clone(),
424            )
425        } else {
426            NewLocalOptions::new(
427                table_id,
428                op_consistency_level,
429                table_option,
430                distribution.vnodes().clone(),
431                true,
432            )
433        };
434        let local_state_store = store.new_local(new_local_options).await;
435
436        // If state table has versioning, that means it supports
437        // Schema change. In that case, the row encoding should be column aware as well.
438        // Otherwise both will be false.
439        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
440        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
441        assert_eq!(
442            table_catalog.version.is_some(),
443            row_serde.kind().is_column_aware()
444        );
445
446        // Restore persisted table watermark.
447        let watermark_serde = if pk_indices.is_empty() {
448            None
449        } else {
450            match table_catalog.clean_watermark_index_in_pk {
451                None => Some(pk_serde.index(0)),
452                Some(clean_watermark_index_in_pk) => {
453                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
454                }
455            }
456        };
457        let max_watermark_of_vnodes = distribution
458            .vnodes()
459            .iter_vnodes()
460            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
461            .max();
462        let committed_watermark = if let Some(deser) = watermark_serde
463            && let Some(max_watermark) = max_watermark_of_vnodes
464        {
465            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
466                assert!(row.len() == 1);
467                row[0].clone()
468            });
469            if deserialized.is_none() {
470                tracing::error!(
471                    vnodes = ?distribution.vnodes(),
472                    watermark = ?max_watermark,
473                    "Failed to deserialize persisted watermark from state store.",
474                );
475            }
476            deserialized
477        } else {
478            None
479        };
480
481        let watermark_cache = if USE_WATERMARK_CACHE {
482            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
483        } else {
484            StateTableWatermarkCache::new(0)
485        };
486
487        // Get info for replicated state table.
488        let output_column_ids_to_input_idx = output_column_ids
489            .iter()
490            .enumerate()
491            .map(|(pos, id)| (*id, pos))
492            .collect::<HashMap<_, _>>();
493
494        // Compute column descriptions
495        let columns: Vec<ColumnDesc> = table_catalog
496            .columns
497            .iter()
498            .map(|c| c.column_desc.as_ref().unwrap().into())
499            .collect_vec();
500
501        // Compute i2o mapping
502        // Note that this can be a partial mapping, since we use the i2o mapping to get
503        // any 1 of the output columns, and use that to fill the input column.
504        let mut i2o_mapping = vec![None; columns.len()];
505        for (i, column) in columns.iter().enumerate() {
506            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
507                i2o_mapping[i] = Some(*pos);
508            }
509        }
510        // We can prune any duplicate column indices
511        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
512
513        // Compute output indices
514        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
515
516        Self {
517            table_id,
518            local_store: local_state_store,
519            store,
520            epoch: None,
521            pk_serde,
522            row_serde,
523            pk_indices,
524            distribution,
525            prefix_hint_len,
526            table_option,
527            value_indices,
528            pending_watermark: None,
529            committed_watermark,
530            watermark_cache,
531            data_types,
532            output_indices,
533            i2o_mapping,
534            op_consistency_level: state_table_op_consistency_level,
535            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536            on_post_commit: false,
537        }
538    }
539
540    pub fn get_data_types(&self) -> &[DataType] {
541        &self.data_types
542    }
543
544    pub fn table_id(&self) -> u32 {
545        self.table_id.table_id
546    }
547
548    /// Get the vnode value with given (prefix of) primary key
549    fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
550        self.distribution
551            .try_compute_vnode_by_pk_prefix(pk_prefix)
552            .expect("For streaming, the given prefix must be enough to calculate the vnode")
553    }
554
555    /// Get the vnode value of the given primary key
556    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
557        self.distribution.compute_vnode_by_pk(pk)
558    }
559
560    /// NOTE(kwannoel): This is used by backfill.
561    /// We want to check pk indices of upstream table.
562    pub fn pk_indices(&self) -> &[usize] {
563        &self.pk_indices
564    }
565
566    /// Get the indices of the primary key columns in the output columns.
567    ///
568    /// Returns `None` if any of the primary key columns is not in the output columns.
569    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
570        assert!(IS_REPLICATED);
571        self.pk_indices
572            .iter()
573            .map(|&i| self.output_indices.iter().position(|&j| i == j))
574            .collect()
575    }
576
577    pub fn pk_serde(&self) -> &OrderedRowSerde {
578        &self.pk_serde
579    }
580
581    pub fn vnodes(&self) -> &Arc<Bitmap> {
582        self.distribution.vnodes()
583    }
584
585    pub fn value_indices(&self) -> &Option<Vec<usize>> {
586        &self.value_indices
587    }
588
589    pub fn is_consistent_op(&self) -> bool {
590        matches!(
591            self.op_consistency_level,
592            StateTableOpConsistencyLevel::ConsistentOldValue
593                | StateTableOpConsistencyLevel::LogStoreEnabled
594        )
595    }
596}
597
598impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
599where
600    S: StateStore,
601    SD: ValueRowSerde,
602{
603    /// Create replicated state table from table catalog with output indices
604    pub async fn from_table_catalog_with_output_column_ids(
605        table_catalog: &Table,
606        store: S,
607        vnodes: Option<Arc<Bitmap>>,
608        output_column_ids: Vec<ColumnId>,
609    ) -> Self {
610        Self::from_table_catalog_inner(
611            table_catalog,
612            store,
613            vnodes,
614            StateTableOpConsistencyLevel::Inconsistent,
615            output_column_ids,
616        )
617        .await
618    }
619}
620
621// point get
622impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
623    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
624where
625    S: StateStore,
626    SD: ValueRowSerde,
627{
628    /// Get a single row from state table.
629    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
630        // TODO: avoid clone when `on_key_value_fn` can be non-static
631        let row_serde = self.row_serde.clone();
632        let row = self
633            .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
634            .await?;
635        match row {
636            Some(row) => {
637                if IS_REPLICATED {
638                    // If the table is replicated, we need to deserialize the row with the output
639                    // indices.
640                    let row = row.project(&self.output_indices);
641                    Ok(Some(row.into_owned_row()))
642                } else {
643                    Ok(Some(OwnedRow::new(row)))
644                }
645            }
646            None => Ok(None),
647        }
648    }
649
650    /// Get a raw encoded row from state table.
651    pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
652        self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
653            .await
654    }
655
656    async fn get_inner<O: Send + 'static>(
657        &self,
658        pk: impl Row,
659        on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
660    ) -> StreamExecutorResult<Option<O>> {
661        assert!(pk.len() <= self.pk_indices.len());
662
663        let serialized_pk =
664            serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
665
666        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
667            Some(serialized_pk.slice(VirtualNode::SIZE..))
668        } else {
669            #[cfg(debug_assertions)]
670            if self.prefix_hint_len != 0 {
671                warn!(
672                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
673                );
674            }
675            None
676        };
677
678        let read_options = ReadOptions {
679            prefix_hint,
680            retention_seconds: self.table_option.retention_seconds,
681            cache_policy: CachePolicy::Fill(Hint::Normal),
682            ..Default::default()
683        };
684
685        self.local_store
686            .on_key_value(serialized_pk, read_options, on_key_value_fn)
687            .await
688            .map_err(Into::into)
689    }
690}
691
692/// A callback struct returned from [`StateTableInner::commit`].
693///
694/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
695/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
696/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
697///
698/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
699/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
700/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
701/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
702/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
703/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
704/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
705/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
706#[must_use]
707pub struct StateTablePostCommit<
708    'a,
709    S,
710    SD = BasicSerde,
711    const IS_REPLICATED: bool = false,
712    const USE_WATERMARK_CACHE: bool = false,
713> where
714    S: StateStore,
715    SD: ValueRowSerde,
716{
717    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
718}
719
720impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
721    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
722where
723    S: StateStore,
724    SD: ValueRowSerde,
725{
726    pub async fn post_yield_barrier(
727        mut self,
728        new_vnodes: Option<Arc<Bitmap>>,
729    ) -> StreamExecutorResult<
730        Option<(
731            (
732                Arc<Bitmap>,
733                Arc<Bitmap>,
734                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
735            ),
736            bool,
737        )>,
738    > {
739        self.inner.on_post_commit = false;
740        Ok(if let Some(new_vnodes) = new_vnodes {
741            let (old_vnodes, cache_may_stale) =
742                self.update_vnode_bitmap(new_vnodes.clone()).await?;
743            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
744        } else {
745            None
746        })
747    }
748
749    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
750        &*self.inner
751    }
752
753    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
754    async fn update_vnode_bitmap(
755        &mut self,
756        new_vnodes: Arc<Bitmap>,
757    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
758        let prev_vnodes = self
759            .inner
760            .local_store
761            .update_vnode_bitmap(new_vnodes.clone())
762            .await?;
763        assert_eq!(
764            &prev_vnodes,
765            self.inner.vnodes(),
766            "state table and state store vnode bitmap mismatches"
767        );
768
769        if self.inner.distribution.is_singleton() {
770            assert_eq!(
771                &new_vnodes,
772                self.inner.vnodes(),
773                "should not update vnode bitmap for singleton table"
774            );
775        }
776        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
777
778        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
779
780        if cache_may_stale {
781            self.inner.pending_watermark = None;
782            if USE_WATERMARK_CACHE {
783                self.inner.watermark_cache.clear();
784            }
785        }
786
787        Ok((
788            self.inner.distribution.update_vnode_bitmap(new_vnodes),
789            cache_may_stale,
790        ))
791    }
792}
793
794// write
795impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
796    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
797where
798    S: StateStore,
799    SD: ValueRowSerde,
800{
801    fn handle_mem_table_error(&self, e: StorageError) {
802        let e = match e.into_inner() {
803            ErrorKind::MemTable(e) => e,
804            _ => unreachable!("should only get memtable error"),
805        };
806        match *e {
807            MemTableError::InconsistentOperation { key, prev, new } => {
808                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
809                panic!(
810                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
811                    self.table_id(),
812                    vnode,
813                    &key,
814                    prev.debug_fmt(&*self.row_serde),
815                    new.debug_fmt(&*self.row_serde),
816                )
817            }
818        }
819    }
820
821    fn serialize_value(&self, value: impl Row) -> Bytes {
822        if let Some(value_indices) = self.value_indices.as_ref() {
823            self.row_serde
824                .serialize(value.project(value_indices))
825                .into()
826        } else {
827            self.row_serde.serialize(value).into()
828        }
829    }
830
831    fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
832        insane_mode_discard_point!();
833        self.local_store
834            .insert(key, value_bytes, None)
835            .unwrap_or_else(|e| self.handle_mem_table_error(e));
836    }
837
838    fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
839        insane_mode_discard_point!();
840        self.local_store
841            .delete(key, value_bytes)
842            .unwrap_or_else(|e| self.handle_mem_table_error(e));
843    }
844
845    fn update_inner(
846        &mut self,
847        key_bytes: TableKey<Bytes>,
848        old_value_bytes: Option<Bytes>,
849        new_value_bytes: Bytes,
850    ) {
851        insane_mode_discard_point!();
852        self.local_store
853            .insert(key_bytes, new_value_bytes, old_value_bytes)
854            .unwrap_or_else(|e| self.handle_mem_table_error(e));
855    }
856
857    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
858    /// the table.
859    pub fn insert(&mut self, value: impl Row) {
860        let pk_indices = &self.pk_indices;
861        let pk = (&value).project(pk_indices);
862        if USE_WATERMARK_CACHE {
863            self.watermark_cache.insert(&pk);
864        }
865
866        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
867        let value_bytes = self.serialize_value(value);
868        self.insert_inner(key_bytes, value_bytes);
869    }
870
871    /// Delete a row from state table. Must provide a full row of old value corresponding to the
872    /// column desc of the table.
873    pub fn delete(&mut self, old_value: impl Row) {
874        let pk_indices = &self.pk_indices;
875        let pk = (&old_value).project(pk_indices);
876        if USE_WATERMARK_CACHE {
877            self.watermark_cache.delete(&pk);
878        }
879
880        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
881        let value_bytes = self.serialize_value(old_value);
882        self.delete_inner(key_bytes, value_bytes);
883    }
884
885    /// Update a row. The old and new value should have the same pk.
886    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
887        let old_pk = (&old_value).project(self.pk_indices());
888        let new_pk = (&new_value).project(self.pk_indices());
889        debug_assert!(
890            Row::eq(&old_pk, new_pk),
891            "pk should not change: {old_pk:?} vs {new_pk:?}",
892        );
893
894        let new_key_bytes =
895            serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
896        let old_value_bytes = self.serialize_value(old_value);
897        let new_value_bytes = self.serialize_value(new_value);
898
899        self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
900    }
901
902    /// Write a record into state table. Must have the same schema with the table.
903    pub fn write_record(&mut self, record: Record<impl Row>) {
904        match record {
905            Record::Insert { new_row } => self.insert(new_row),
906            Record::Delete { old_row } => self.delete(old_row),
907            Record::Update { old_row, new_row } => self.update(old_row, new_row),
908        }
909    }
910
911    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
912        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
913    }
914
915    /// Write batch with a `StreamChunk` which should have the same schema with the table.
916    // allow(izip, which use zip instead of zip_eq)
917    #[allow(clippy::disallowed_methods)]
918    pub fn write_chunk(&mut self, chunk: StreamChunk) {
919        let chunk = if IS_REPLICATED {
920            self.fill_non_output_indices(chunk)
921        } else {
922            chunk
923        };
924        let (chunk, op) = chunk.into_parts();
925
926        let vnodes = self
927            .distribution
928            .compute_chunk_vnode(&chunk, &self.pk_indices);
929
930        let values = if let Some(ref value_indices) = self.value_indices {
931            chunk
932                .project(value_indices)
933                .serialize_with(&*self.row_serde)
934        } else {
935            chunk.serialize_with(&*self.row_serde)
936        };
937
938        // TODO(kwannoel): Seems like we are doing vis check twice here.
939        // Once below, when using vis, and once here,
940        // when using vis to set rows empty or not.
941        // If we are to use the vis optimization, we should skip this.
942        let key_chunk = chunk.project(self.pk_indices());
943        let vnode_and_pks = key_chunk
944            .rows_with_holes()
945            .zip_eq_fast(vnodes.iter())
946            .map(|(r, vnode)| {
947                let mut buffer = BytesMut::new();
948                buffer.put_slice(&vnode.to_be_bytes()[..]);
949                if let Some(r) = r {
950                    self.pk_serde.serialize(r, &mut buffer);
951                }
952                (r, buffer.freeze())
953            })
954            .collect_vec();
955
956        if !key_chunk.is_compacted() {
957            for ((op, (key, key_bytes), value), vis) in
958                izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
959            {
960                if vis {
961                    match op {
962                        Op::Insert | Op::UpdateInsert => {
963                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
964                                self.watermark_cache.insert(pk);
965                            }
966                            self.insert_inner(TableKey(key_bytes), value);
967                        }
968                        Op::Delete | Op::UpdateDelete => {
969                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
970                                self.watermark_cache.delete(pk);
971                            }
972                            self.delete_inner(TableKey(key_bytes), value);
973                        }
974                    }
975                }
976            }
977        } else {
978            for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
979                match op {
980                    Op::Insert | Op::UpdateInsert => {
981                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
982                            self.watermark_cache.insert(pk);
983                        }
984                        self.insert_inner(TableKey(key_bytes), value);
985                    }
986                    Op::Delete | Op::UpdateDelete => {
987                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
988                            self.watermark_cache.delete(pk);
989                        }
990                        self.delete_inner(TableKey(key_bytes), value);
991                    }
992                }
993            }
994        }
995    }
996
997    /// Update watermark for state cleaning.
998    ///
999    /// # Arguments
1000    ///
1001    /// * `watermark` - Latest watermark received.
1002    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1003        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1004        self.pending_watermark = Some(watermark);
1005    }
1006
1007    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1008    /// table through `update_watermark` method.
1009    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1010        self.committed_watermark.as_ref()
1011    }
1012
1013    pub async fn commit(
1014        &mut self,
1015        new_epoch: EpochPair,
1016    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1017    {
1018        self.commit_inner(new_epoch, None).await
1019    }
1020
1021    #[cfg(test)]
1022    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1023        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1024    }
1025
1026    pub async fn commit_assert_no_update_vnode_bitmap(
1027        &mut self,
1028        new_epoch: EpochPair,
1029    ) -> StreamExecutorResult<()> {
1030        let post_commit = self.commit_inner(new_epoch, None).await?;
1031        post_commit.post_yield_barrier(None).await?;
1032        Ok(())
1033    }
1034
1035    pub async fn commit_may_switch_consistent_op(
1036        &mut self,
1037        new_epoch: EpochPair,
1038        op_consistency_level: StateTableOpConsistencyLevel,
1039    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1040    {
1041        if self.op_consistency_level != op_consistency_level {
1042            info!(
1043                ?new_epoch,
1044                prev_op_consistency_level = ?self.op_consistency_level,
1045                ?op_consistency_level,
1046                table_id = self.table_id.table_id,
1047                "switch to new op consistency level"
1048            );
1049            self.commit_inner(new_epoch, Some(op_consistency_level))
1050                .await
1051        } else {
1052            self.commit_inner(new_epoch, None).await
1053        }
1054    }
1055
1056    async fn commit_inner(
1057        &mut self,
1058        new_epoch: EpochPair,
1059        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1060    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1061    {
1062        assert!(!self.on_post_commit);
1063        assert_eq!(
1064            self.epoch.expect("should only be called after init").curr,
1065            new_epoch.prev
1066        );
1067        let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1068            assert_ne!(self.op_consistency_level, new_consistency_level);
1069            self.op_consistency_level = new_consistency_level;
1070            match new_consistency_level {
1071                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1072                StateTableOpConsistencyLevel::ConsistentOldValue => {
1073                    consistent_old_value_op(self.row_serde.clone(), false)
1074                }
1075                StateTableOpConsistencyLevel::LogStoreEnabled => {
1076                    consistent_old_value_op(self.row_serde.clone(), true)
1077                }
1078            }
1079        });
1080        trace!(
1081            table_id = %self.table_id,
1082            epoch = ?self.epoch,
1083            "commit state table"
1084        );
1085
1086        self.local_store
1087            .flush()
1088            .instrument(tracing::info_span!("state_table_flush"))
1089            .await?;
1090        let table_watermarks = self.commit_pending_watermark();
1091        self.local_store.seal_current_epoch(
1092            new_epoch.curr,
1093            SealCurrentEpochOptions {
1094                table_watermarks,
1095                switch_op_consistency_level,
1096            },
1097        );
1098        self.epoch = Some(new_epoch);
1099
1100        // Refresh watermark cache if it is out of sync.
1101        if USE_WATERMARK_CACHE
1102            && !self.watermark_cache.is_synced()
1103            && let Some(ref watermark) = self.committed_watermark
1104        {
1105            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1106                (Included(once(Some(watermark.clone()))), Unbounded);
1107            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1108            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1109            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1110            // TODO(kwannoel): We can optimize it with:
1111            // 1. Either use `RefCell`.
1112            // 2. Or pass in a direct reference to LocalStateStore,
1113            //    instead of referencing it indirectly from `self`.
1114            //    Similar to how we do for pk_indices.
1115            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1116            {
1117                let mut streams = vec![];
1118                for vnode in self.vnodes().iter_vnodes() {
1119                    let stream = self
1120                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1121                        .await?;
1122                    streams.push(Box::pin(stream));
1123                }
1124                let merged_stream = merge_sort(streams);
1125                pin_mut!(merged_stream);
1126
1127                #[for_await]
1128                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1129                    let keyed_row = entry?;
1130                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1131                    // watermark column should be part of the pk
1132                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1133                        pks.push(pk);
1134                    }
1135                }
1136            }
1137
1138            let mut filler = self.watermark_cache.begin_syncing();
1139            for pk in pks {
1140                filler.insert_unchecked(DefaultOrdered(pk), ());
1141            }
1142            filler.finish();
1143
1144            let n_cache_entries = self.watermark_cache.len();
1145            if n_cache_entries < self.watermark_cache.capacity() {
1146                self.watermark_cache.set_table_row_count(n_cache_entries);
1147            }
1148        }
1149
1150        self.on_post_commit = true;
1151        Ok(StateTablePostCommit { inner: self })
1152    }
1153
1154    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1155    fn commit_pending_watermark(
1156        &mut self,
1157    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1158        let watermark = self.pending_watermark.take()?;
1159        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1160
1161        assert!(
1162            !self.pk_indices().is_empty(),
1163            "see pending watermark on empty pk"
1164        );
1165        let watermark_serializer = {
1166            match self.clean_watermark_index_in_pk {
1167                None => self.pk_serde.index(0),
1168                Some(clean_watermark_index_in_pk) => {
1169                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1170                }
1171            }
1172        };
1173
1174        let watermark_type = match self.clean_watermark_index_in_pk {
1175            None => WatermarkSerdeType::PkPrefix,
1176            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1177                0 => WatermarkSerdeType::PkPrefix,
1178                _ => WatermarkSerdeType::NonPkPrefix,
1179            },
1180        };
1181
1182        let should_clean_watermark = {
1183            {
1184                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1185                    if let Some(key) = self.watermark_cache.lowest_key() {
1186                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1187                    } else {
1188                        // Watermark cache is synced,
1189                        // And there's no key in watermark cache.
1190                        // That implies table is empty.
1191                        // We should not clean watermark.
1192                        false
1193                    }
1194                } else {
1195                    // Either we are not using watermark cache,
1196                    // Or watermark_cache is not synced.
1197                    // In either case we should clean watermark.
1198                    true
1199                }
1200            }
1201        };
1202
1203        let watermark_suffix =
1204            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1205
1206        // Compute Delete Ranges
1207        let seal_watermark = if should_clean_watermark {
1208            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1209                self.vnodes().iter_vnodes().collect_vec()
1210            }, "delete range");
1211
1212            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1213
1214            if order_type.is_ascending() {
1215                Some((
1216                    WatermarkDirection::Ascending,
1217                    VnodeWatermark::new(
1218                        self.vnodes().clone(),
1219                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1220                    ),
1221                    watermark_type,
1222                ))
1223            } else {
1224                Some((
1225                    WatermarkDirection::Descending,
1226                    VnodeWatermark::new(
1227                        self.vnodes().clone(),
1228                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1229                    ),
1230                    watermark_type,
1231                ))
1232            }
1233        } else {
1234            None
1235        };
1236        self.committed_watermark = Some(watermark);
1237
1238        // Clear the watermark cache and force a resync.
1239        // TODO(kwannoel): This can be further optimized:
1240        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1241        //    up to the largest end of delete ranges.
1242        // 2. Mark the cache as not_synced, so we can still refill it later.
1243        // 3. When refilling the cache,
1244        //    we just refill from the largest value of the cache, as the lower bound.
1245        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1246            self.watermark_cache.clear();
1247        }
1248
1249        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1250            (direction, vec![watermark], is_non_pk_prefix)
1251        })
1252    }
1253
1254    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1255        self.local_store.try_flush().await?;
1256        Ok(())
1257    }
1258}
1259
1260pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1261pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1262pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1263
1264// Iterator functions
1265impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1266    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1267where
1268    S: StateStore,
1269    SD: ValueRowSerde,
1270{
1271    /// This function scans rows from the relational table with specific `pk_range` under the same
1272    /// `vnode`.
1273    pub async fn iter_with_vnode(
1274        &self,
1275
1276        // Optional vnode that returns an iterator only over the given range under that vnode.
1277        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1278        // iterate over each vnode that the `StateTableInner` owns.
1279        vnode: VirtualNode,
1280        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1281        prefetch_options: PrefetchOptions,
1282    ) -> StreamExecutorResult<impl RowStream<'_>> {
1283        Ok(deserialize_keyed_row_stream::<'_, ()>(
1284            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1285                .await?,
1286            &*self.row_serde,
1287        )
1288        .map_ok(|(_, row)| row))
1289    }
1290
1291    pub async fn iter_keyed_row_with_vnode(
1292        &self,
1293        vnode: VirtualNode,
1294        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1295        prefetch_options: PrefetchOptions,
1296    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1297        Ok(deserialize_keyed_row_stream(
1298            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1299                .await?,
1300            &*self.row_serde,
1301        )
1302        .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1303    }
1304
1305    pub async fn iter_with_vnode_and_output_indices(
1306        &self,
1307        vnode: VirtualNode,
1308        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1309        prefetch_options: PrefetchOptions,
1310    ) -> StreamExecutorResult<impl RowStream<'_>> {
1311        assert!(IS_REPLICATED);
1312        let stream = self
1313            .iter_with_vnode(vnode, pk_range, prefetch_options)
1314            .await?;
1315        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1316    }
1317
1318    async fn iter_kv(
1319        &self,
1320        table_key_range: TableKeyRange,
1321        prefix_hint: Option<Bytes>,
1322        prefetch_options: PrefetchOptions,
1323    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1324        let read_options = ReadOptions {
1325            prefix_hint,
1326            retention_seconds: self.table_option.retention_seconds,
1327            prefetch_options,
1328            cache_policy: CachePolicy::Fill(Hint::Normal),
1329        };
1330
1331        Ok(self.local_store.iter(table_key_range, read_options).await?)
1332    }
1333
1334    async fn rev_iter_kv(
1335        &self,
1336        table_key_range: TableKeyRange,
1337        prefix_hint: Option<Bytes>,
1338        prefetch_options: PrefetchOptions,
1339    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1340        let read_options = ReadOptions {
1341            prefix_hint,
1342            retention_seconds: self.table_option.retention_seconds,
1343            prefetch_options,
1344            cache_policy: CachePolicy::Fill(Hint::Normal),
1345        };
1346
1347        Ok(self
1348            .local_store
1349            .rev_iter(table_key_range, read_options)
1350            .await?)
1351    }
1352
1353    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1354    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1355    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1356    pub async fn iter_with_prefix(
1357        &self,
1358        pk_prefix: impl Row,
1359        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1360        prefetch_options: PrefetchOptions,
1361    ) -> StreamExecutorResult<impl RowStream<'_>> {
1362        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1363            .await?;
1364        Ok(stream.map_ok(|(_, row)| row))
1365    }
1366
1367    /// Get the row from a state table with only 1 row.
1368    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1369        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1370        let stream = self
1371            .iter_with_prefix(row::empty(), sub_range, Default::default())
1372            .await?;
1373        pin_mut!(stream);
1374
1375        if let Some(res) = stream.next().await {
1376            let value = res?.into_owned_row();
1377            assert!(stream.next().await.is_none());
1378            Ok(Some(value))
1379        } else {
1380            Ok(None)
1381        }
1382    }
1383
1384    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1385    ///
1386    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1387    /// which does not matter in the use case.
1388    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1389        Ok(self
1390            .get_from_one_row_table()
1391            .await?
1392            .and_then(|row| row[0].clone()))
1393    }
1394
1395    pub async fn iter_keyed_row_with_prefix(
1396        &self,
1397        pk_prefix: impl Row,
1398        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1399        prefetch_options: PrefetchOptions,
1400    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1401        Ok(
1402            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1403            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1404        )
1405    }
1406
1407    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1408    pub async fn rev_iter_with_prefix(
1409        &self,
1410        pk_prefix: impl Row,
1411        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1412        prefetch_options: PrefetchOptions,
1413    ) -> StreamExecutorResult<impl RowStream<'_>> {
1414        Ok(
1415            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1416            .await?.map_ok(|(_, row)| row),
1417        )
1418    }
1419
1420    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1421        &self,
1422        pk_prefix: impl Row,
1423        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1424        prefetch_options: PrefetchOptions,
1425    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1426        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1427        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1428
1429        // We assume that all usages of iterating the state table only access a single vnode.
1430        // If this assertion fails, then something must be wrong with the operator implementation or
1431        // the distribution derivation from the optimizer.
1432        let vnode = self.compute_prefix_vnode(&pk_prefix);
1433
1434        // Construct prefix hint for prefix bloom filter.
1435        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1436        if self.prefix_hint_len != 0 {
1437            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1438        }
1439        let prefix_hint = {
1440            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1441                None
1442            } else {
1443                let encoded_prefix_len = self
1444                    .pk_serde
1445                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1446
1447                Some(Bytes::copy_from_slice(
1448                    &encoded_prefix[..encoded_prefix_len],
1449                ))
1450            }
1451        };
1452
1453        trace!(
1454            table_id = %self.table_id(),
1455            ?prefix_hint, ?pk_prefix,
1456            ?pk_prefix_indices,
1457            iter_direction = if REVERSE { "reverse" } else { "forward" },
1458            "storage_iter_with_prefix"
1459        );
1460
1461        let memcomparable_range =
1462            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1463
1464        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1465
1466        Ok(if REVERSE {
1467            futures::future::Either::Left(deserialize_keyed_row_stream(
1468                self.rev_iter_kv(
1469                    memcomparable_range_with_vnode,
1470                    prefix_hint,
1471                    prefetch_options,
1472                )
1473                .await?,
1474                &*self.row_serde,
1475            ))
1476        } else {
1477            futures::future::Either::Right(deserialize_keyed_row_stream(
1478                self.iter_kv(
1479                    memcomparable_range_with_vnode,
1480                    prefix_hint,
1481                    prefetch_options,
1482                )
1483                .await?,
1484                &*self.row_serde,
1485            ))
1486        })
1487    }
1488
1489    /// This function scans raw key-values from the relational table with specific `pk_range` under
1490    /// the same `vnode`.
1491    async fn iter_kv_with_pk_range(
1492        &self,
1493        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1494        // Optional vnode that returns an iterator only over the given range under that vnode.
1495        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1496        // iterate over each vnode that the `StateTableInner` owns.
1497        vnode: VirtualNode,
1498        prefetch_options: PrefetchOptions,
1499    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1500        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1501        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1502
1503        // TODO: provide a trace of useful params.
1504        self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1505            .await
1506    }
1507
1508    #[cfg(test)]
1509    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1510        &self.watermark_cache
1511    }
1512}
1513
1514impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1515    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1516where
1517    S: StateStore,
1518    SD: ValueRowSerde,
1519{
1520    pub async fn iter_log_with_vnode(
1521        &self,
1522        vnode: VirtualNode,
1523        epoch_range: (u64, u64),
1524        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1525    ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1526        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1527        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1528        Ok(deserialize_log_stream(
1529            self.store
1530                .iter_log(
1531                    epoch_range,
1532                    memcomparable_range_with_vnode,
1533                    ReadLogOptions {
1534                        table_id: self.table_id,
1535                    },
1536                )
1537                .await?,
1538            &*self.row_serde,
1539        )
1540        .map_err(Into::into))
1541    }
1542}
1543
1544fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1545    iter: impl StateStoreIter + 'a,
1546    deserializer: &'a impl ValueRowSerde,
1547) -> impl PkRowStream<'a, K> {
1548    iter.into_stream(move |(key, value)| {
1549        Ok((
1550            K::copy_from_slice(key.user_key.table_key.as_ref()),
1551            deserializer.deserialize(value).map(OwnedRow::new)?,
1552        ))
1553    })
1554    .map_err(Into::into)
1555}
1556
1557pub fn prefix_range_to_memcomparable(
1558    pk_serde: &OrderedRowSerde,
1559    range: &(Bound<impl Row>, Bound<impl Row>),
1560) -> (Bound<Bytes>, Bound<Bytes>) {
1561    (
1562        start_range_to_memcomparable(pk_serde, &range.0),
1563        end_range_to_memcomparable(pk_serde, &range.1, None),
1564    )
1565}
1566
1567fn prefix_and_sub_range_to_memcomparable(
1568    pk_serde: &OrderedRowSerde,
1569    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1570    pk_prefix: impl Row,
1571) -> (Bound<Bytes>, Bound<Bytes>) {
1572    let (range_start, range_end) = sub_range;
1573    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1574    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1575    let start_range = match range_start {
1576        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1577        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1578        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1579    };
1580    let end_range = match range_end {
1581        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1582        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1583        Unbounded => Unbounded,
1584    };
1585    (
1586        start_range_to_memcomparable(pk_serde, &start_range),
1587        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1588    )
1589}
1590
1591fn start_range_to_memcomparable<R: Row>(
1592    pk_serde: &OrderedRowSerde,
1593    bound: &Bound<R>,
1594) -> Bound<Bytes> {
1595    let serialize_pk_prefix = |pk_prefix: &R| {
1596        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1597        serialize_pk(pk_prefix, &prefix_serializer)
1598    };
1599    match bound {
1600        Unbounded => Unbounded,
1601        Included(r) => {
1602            let serialized = serialize_pk_prefix(r);
1603
1604            Included(serialized)
1605        }
1606        Excluded(r) => {
1607            let serialized = serialize_pk_prefix(r);
1608
1609            start_bound_of_excluded_prefix(&serialized)
1610        }
1611    }
1612}
1613
1614fn end_range_to_memcomparable<R: Row>(
1615    pk_serde: &OrderedRowSerde,
1616    bound: &Bound<R>,
1617    serialized_pk_prefix: Option<Bytes>,
1618) -> Bound<Bytes> {
1619    let serialize_pk_prefix = |pk_prefix: &R| {
1620        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1621        serialize_pk(pk_prefix, &prefix_serializer)
1622    };
1623    match bound {
1624        Unbounded => match serialized_pk_prefix {
1625            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1626            None => Unbounded,
1627        },
1628        Included(r) => {
1629            let serialized = serialize_pk_prefix(r);
1630
1631            end_bound_of_prefix(&serialized)
1632        }
1633        Excluded(r) => {
1634            let serialized = serialize_pk_prefix(r);
1635            Excluded(serialized)
1636        }
1637    }
1638}
1639
1640fn fill_non_output_indices(
1641    i2o_mapping: &ColIndexMapping,
1642    data_types: &[DataType],
1643    chunk: StreamChunk,
1644) -> StreamChunk {
1645    let cardinality = chunk.cardinality();
1646    let (ops, columns, vis) = chunk.into_inner();
1647    let mut full_columns = Vec::with_capacity(data_types.len());
1648    for (i, data_type) in data_types.iter().enumerate() {
1649        if let Some(j) = i2o_mapping.try_map(i) {
1650            full_columns.push(columns[j].clone());
1651        } else {
1652            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1653            column_builder.append_n_null(cardinality);
1654            let column: ArrayRef = column_builder.finish().into();
1655            full_columns.push(column)
1656        }
1657    }
1658    let data_chunk = DataChunk::new(full_columns, vis);
1659    StreamChunk::from_parts(ops, data_chunk)
1660}
1661
1662#[cfg(test)]
1663mod tests {
1664    use std::fmt::Debug;
1665
1666    use expect_test::{Expect, expect};
1667
1668    use super::*;
1669
1670    fn check(actual: impl Debug, expect: Expect) {
1671        let actual = format!("{:#?}", actual);
1672        expect.assert_eq(&actual);
1673    }
1674
1675    #[test]
1676    fn test_fill_non_output_indices() {
1677        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1678        let replicated_chunk = [OwnedRow::new(vec![
1679            Some(222_i32.into()),
1680            Some(2_i32.into()),
1681        ])];
1682        let replicated_chunk = StreamChunk::from_parts(
1683            vec![Op::Insert],
1684            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1685        );
1686        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1687        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1688        check(
1689            filled_chunk,
1690            expect![[r#"
1691            StreamChunk { cardinality: 1, capacity: 1, data:
1692            +---+---+---+-----+
1693            | + | 2 |   | 222 |
1694            +---+---+---+-----+
1695             }"#]],
1696        );
1697    }
1698}