risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::ops::Bound;
17use std::ops::Bound::*;
18use std::sync::Arc;
19
20use bytes::{BufMut, Bytes, BytesMut};
21use either::Either;
22use foyer::Hint;
23use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
24use futures_async_stream::for_await;
25use itertools::{Itertools, izip};
26use risingwave_common::array::stream_record::Record;
27use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
28use risingwave_common::bitmap::Bitmap;
29use risingwave_common::catalog::{
30    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
31};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
33use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
34use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
35use risingwave_common::util::column_index_mapping::ColIndexMapping;
36use risingwave_common::util::epoch::EpochPair;
37use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
38use risingwave_common::util::row_serde::OrderedRowSerde;
39use risingwave_common::util::sort_util::OrderType;
40use risingwave_common::util::value_encoding::BasicSerde;
41use risingwave_hummock_sdk::HummockReadEpoch;
42use risingwave_hummock_sdk::key::{
43    CopyFromSlice, TableKey, TableKeyRange, end_bound_of_prefix, prefixed_range_with_vnode,
44    start_bound_of_excluded_prefix,
45};
46use risingwave_hummock_sdk::table_watermark::{
47    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
48};
49use risingwave_pb::catalog::Table;
50use risingwave_storage::StateStore;
51use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
52use risingwave_storage::hummock::CachePolicy;
53use risingwave_storage::mem_table::MemTableError;
54use risingwave_storage::row_serde::find_columns_by_ids;
55use risingwave_storage::row_serde::row_serde_util::{
56    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
57};
58use risingwave_storage::row_serde::value_serde::ValueRowSerde;
59use risingwave_storage::store::*;
60use risingwave_storage::table::merge_sort::merge_sort;
61use risingwave_storage::table::{
62    ChangeLogRow, KeyedRow, TableDistribution, deserialize_log_stream,
63};
64use thiserror_ext::AsReport;
65use tracing::{Instrument, trace};
66
67use crate::cache::cache_may_stale;
68use crate::common::state_cache::{StateCache, StateCacheFiller};
69use crate::common::table::state_table_cache::StateTableWatermarkCache;
70use crate::executor::StreamExecutorResult;
71
72/// Mostly watermark operators will have inserts (append-only).
73/// So this number should not need to be very large.
74/// But we may want to improve this choice in the future.
75const WATERMARK_CACHE_ENTRIES: usize = 16;
76
77/// This macro is used to mark a point where we want to randomly discard the operation and early
78/// return, only in insane mode.
79macro_rules! insane_mode_discard_point {
80    () => {{
81        use rand::Rng;
82        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
83            return;
84        }
85    }};
86}
87
88/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
89/// row-based encoding.
90#[derive(Clone)]
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    local_store: S::Local,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Row deserializer with value encoding
116    row_serde: Arc<SD>,
117
118    /// Indices of primary key.
119    /// Note that the index is based on the all columns of the table, instead of the output ones.
120    // FIXME: revisit constructions and usages.
121    pk_indices: Vec<usize>,
122
123    /// Distribution of the state table.
124    ///
125    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
126    /// executor. The table will also check whether the written rows
127    /// conform to this partition.
128    distribution: TableDistribution,
129
130    prefix_hint_len: usize,
131
132    /// Used for catalog `table_properties`
133    table_option: TableOption,
134
135    value_indices: Option<Vec<usize>>,
136
137    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
138    pending_watermark: Option<ScalarImpl>,
139    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
140    committed_watermark: Option<ScalarImpl>,
141    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
142    watermark_cache: StateTableWatermarkCache,
143
144    /// Data Types
145    /// We will need to use to build data chunks from state table rows.
146    data_types: Vec<DataType>,
147
148    /// "i" here refers to the base `state_table`'s actual schema.
149    /// "o" here refers to the replicated state table's output schema.
150    /// This mapping is used to reconstruct a row being written from replicated state table.
151    /// Such that the schema of this row will match the full schema of the base state table.
152    /// It is only applicable for replication.
153    i2o_mapping: ColIndexMapping,
154
155    /// Output indices
156    /// Used for:
157    /// 1. Computing `output_value_indices` to ser/de replicated rows.
158    /// 2. Computing output pk indices to used them for backfill state.
159    output_indices: Vec<usize>,
160
161    op_consistency_level: StateTableOpConsistencyLevel,
162
163    clean_watermark_index_in_pk: Option<i32>,
164
165    /// Flag to indicate whether the state table has called `commit`, but has not called
166    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
167    on_post_commit: bool,
168}
169
170/// `StateTable` will use `BasicSerde` as default
171pub type StateTable<S> = StateTableInner<S, BasicSerde>;
172/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
173/// Used for `ArrangementBackfill` executor.
174pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
175/// `WatermarkCacheStateTable` caches the watermark column.
176/// It will reduce state cleaning overhead.
177pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
178pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
179    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
180
181// initialize
182impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
183    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
184where
185    S: StateStore,
186    SD: ValueRowSerde,
187{
188    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
189    /// and otherwise, deadlock can be likely to happen.
190    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
191        self.local_store.init(InitOptions::new(epoch)).await?;
192        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
193        Ok(())
194    }
195
196    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
197        self.store
198            .try_wait_epoch(
199                HummockReadEpoch::Committed(prev_epoch),
200                TryWaitEpochOptions {
201                    table_id: self.table_id,
202                },
203            )
204            .await
205    }
206
207    pub fn state_store(&self) -> &S {
208        &self.store
209    }
210}
211
212fn consistent_old_value_op(
213    row_serde: Arc<impl ValueRowSerde>,
214    is_log_store: bool,
215) -> OpConsistencyLevel {
216    OpConsistencyLevel::ConsistentOldValue {
217        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
218            if first == second {
219                return true;
220            }
221            let first = match row_serde.deserialize(first) {
222                Ok(rows) => rows,
223                Err(e) => {
224                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
225                    return false;
226                }
227            };
228            let second = match row_serde.deserialize(second) {
229                Ok(rows) => rows,
230                Err(e) => {
231                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
232                    return false;
233                }
234            };
235            if first != second {
236                error!(first = ?first, second = ?second, "sanity check fail");
237                false
238            } else {
239                true
240            }
241        }),
242        is_log_store,
243    }
244}
245
246#[derive(Eq, PartialEq, Copy, Clone, Debug)]
247pub enum StateTableOpConsistencyLevel {
248    /// Op is inconsistent
249    Inconsistent,
250    /// Op is consistent.
251    /// - Insert op should ensure that the key does not exist previously
252    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
253    ConsistentOldValue,
254    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
255    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
256    LogStoreEnabled,
257}
258
259// initialize
260// FIXME(kwannoel): Enforce that none of the constructors here
261// should be used by replicated state table.
262// Apart from from_table_catalog_inner.
263impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
264    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
265where
266    S: StateStore,
267    SD: ValueRowSerde,
268{
269    /// Create state table from table catalog and store.
270    ///
271    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
272    pub async fn from_table_catalog(
273        table_catalog: &Table,
274        store: S,
275        vnodes: Option<Arc<Bitmap>>,
276    ) -> Self {
277        Self::from_table_catalog_with_consistency_level(
278            table_catalog,
279            store,
280            vnodes,
281            StateTableOpConsistencyLevel::ConsistentOldValue,
282        )
283        .await
284    }
285
286    /// Create state table from table catalog and store with sanity check disabled.
287    pub async fn from_table_catalog_inconsistent_op(
288        table_catalog: &Table,
289        store: S,
290        vnodes: Option<Arc<Bitmap>>,
291    ) -> Self {
292        Self::from_table_catalog_with_consistency_level(
293            table_catalog,
294            store,
295            vnodes,
296            StateTableOpConsistencyLevel::Inconsistent,
297        )
298        .await
299    }
300
301    pub async fn from_table_catalog_with_consistency_level(
302        table_catalog: &Table,
303        store: S,
304        vnodes: Option<Arc<Bitmap>>,
305        consistency_level: StateTableOpConsistencyLevel,
306    ) -> Self {
307        Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
308            .await
309    }
310
311    /// Create state table from table catalog and store.
312    async fn from_table_catalog_inner(
313        table_catalog: &Table,
314        store: S,
315        vnodes: Option<Arc<Bitmap>>,
316        op_consistency_level: StateTableOpConsistencyLevel,
317        output_column_ids: Vec<ColumnId>,
318    ) -> Self {
319        let table_id = TableId::new(table_catalog.id);
320        let table_columns: Vec<ColumnDesc> = table_catalog
321            .columns
322            .iter()
323            .map(|col| col.column_desc.as_ref().unwrap().into())
324            .collect();
325        let data_types: Vec<DataType> = table_catalog
326            .columns
327            .iter()
328            .map(|col| {
329                col.get_column_desc()
330                    .unwrap()
331                    .get_column_type()
332                    .unwrap()
333                    .into()
334            })
335            .collect();
336        let order_types: Vec<OrderType> = table_catalog
337            .pk
338            .iter()
339            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
340            .collect();
341        let dist_key_indices: Vec<usize> = table_catalog
342            .distribution_key
343            .iter()
344            .map(|dist_index| *dist_index as usize)
345            .collect();
346
347        let pk_indices = table_catalog
348            .pk
349            .iter()
350            .map(|col_order| col_order.column_index as usize)
351            .collect_vec();
352
353        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
354        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
355            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
356        } else {
357            table_catalog
358                .get_dist_key_in_pk()
359                .iter()
360                .map(|idx| *idx as usize)
361                .collect()
362        };
363
364        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
365            let vnode_col_idx = *idx as usize;
366            pk_indices.iter().position(|&i| vnode_col_idx == i)
367        });
368
369        let distribution =
370            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
371        assert_eq!(
372            distribution.vnode_count(),
373            table_catalog.vnode_count(),
374            "vnode count mismatch, scanning table {} under wrong distribution?",
375            table_catalog.name,
376        );
377
378        let pk_data_types = pk_indices
379            .iter()
380            .map(|i| table_columns[*i].data_type.clone())
381            .collect();
382        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
383
384        let input_value_indices = table_catalog
385            .value_indices
386            .iter()
387            .map(|val| *val as usize)
388            .collect_vec();
389
390        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
391
392        // if value_indices is the no shuffle full columns.
393        let value_indices = match input_value_indices.len() == table_columns.len()
394            && input_value_indices == no_shuffle_value_indices
395        {
396            true => None,
397            false => Some(input_value_indices),
398        };
399        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
400
401        let row_serde = Arc::new(SD::new(
402            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
403            Arc::from(table_columns.clone().into_boxed_slice()),
404        ));
405
406        let state_table_op_consistency_level = op_consistency_level;
407        let op_consistency_level = match op_consistency_level {
408            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
409            StateTableOpConsistencyLevel::ConsistentOldValue => {
410                consistent_old_value_op(row_serde.clone(), false)
411            }
412            StateTableOpConsistencyLevel::LogStoreEnabled => {
413                consistent_old_value_op(row_serde.clone(), true)
414            }
415        };
416
417        let table_option = TableOption::new(table_catalog.retention_seconds);
418        let new_local_options = if IS_REPLICATED {
419            NewLocalOptions::new_replicated(
420                table_id,
421                op_consistency_level,
422                table_option,
423                distribution.vnodes().clone(),
424            )
425        } else {
426            NewLocalOptions::new(
427                table_id,
428                op_consistency_level,
429                table_option,
430                distribution.vnodes().clone(),
431                true,
432            )
433        };
434        let local_state_store = store.new_local(new_local_options).await;
435
436        // If state table has versioning, that means it supports
437        // Schema change. In that case, the row encoding should be column aware as well.
438        // Otherwise both will be false.
439        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
440        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
441        assert_eq!(
442            table_catalog.version.is_some(),
443            row_serde.kind().is_column_aware()
444        );
445
446        // Restore persisted table watermark.
447        let watermark_serde = if pk_indices.is_empty() {
448            None
449        } else {
450            match table_catalog.clean_watermark_index_in_pk {
451                None => Some(pk_serde.index(0)),
452                Some(clean_watermark_index_in_pk) => {
453                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
454                }
455            }
456        };
457        let max_watermark_of_vnodes = distribution
458            .vnodes()
459            .iter_vnodes()
460            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
461            .max();
462        let committed_watermark = if let Some(deser) = watermark_serde
463            && let Some(max_watermark) = max_watermark_of_vnodes
464        {
465            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
466                assert!(row.len() == 1);
467                row[0].clone()
468            });
469            if deserialized.is_none() {
470                tracing::error!(
471                    vnodes = ?distribution.vnodes(),
472                    watermark = ?max_watermark,
473                    "Failed to deserialize persisted watermark from state store.",
474                );
475            }
476            deserialized
477        } else {
478            None
479        };
480
481        let watermark_cache = if USE_WATERMARK_CACHE {
482            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
483        } else {
484            StateTableWatermarkCache::new(0)
485        };
486
487        // Get info for replicated state table.
488        let output_column_ids_to_input_idx = output_column_ids
489            .iter()
490            .enumerate()
491            .map(|(pos, id)| (*id, pos))
492            .collect::<HashMap<_, _>>();
493
494        // Compute column descriptions
495        let columns: Vec<ColumnDesc> = table_catalog
496            .columns
497            .iter()
498            .map(|c| c.column_desc.as_ref().unwrap().into())
499            .collect_vec();
500
501        // Compute i2o mapping
502        // Note that this can be a partial mapping, since we use the i2o mapping to get
503        // any 1 of the output columns, and use that to fill the input column.
504        let mut i2o_mapping = vec![None; columns.len()];
505        for (i, column) in columns.iter().enumerate() {
506            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
507                i2o_mapping[i] = Some(*pos);
508            }
509        }
510        // We can prune any duplicate column indices
511        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
512
513        // Compute output indices
514        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
515
516        Self {
517            table_id,
518            local_store: local_state_store,
519            store,
520            epoch: None,
521            pk_serde,
522            row_serde,
523            pk_indices,
524            distribution,
525            prefix_hint_len,
526            table_option,
527            value_indices,
528            pending_watermark: None,
529            committed_watermark,
530            watermark_cache,
531            data_types,
532            output_indices,
533            i2o_mapping,
534            op_consistency_level: state_table_op_consistency_level,
535            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
536            on_post_commit: false,
537        }
538    }
539
540    pub fn get_data_types(&self) -> &[DataType] {
541        &self.data_types
542    }
543
544    pub fn table_id(&self) -> u32 {
545        self.table_id.table_id
546    }
547
548    /// Get the vnode value with given (prefix of) primary key
549    fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
550        self.distribution
551            .try_compute_vnode_by_pk_prefix(pk_prefix)
552            .expect("For streaming, the given prefix must be enough to calculate the vnode")
553    }
554
555    /// Get the vnode value of the given primary key
556    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
557        self.distribution.compute_vnode_by_pk(pk)
558    }
559
560    /// NOTE(kwannoel): This is used by backfill.
561    /// We want to check pk indices of upstream table.
562    pub fn pk_indices(&self) -> &[usize] {
563        &self.pk_indices
564    }
565
566    /// Get the indices of the primary key columns in the output columns.
567    ///
568    /// Returns `None` if any of the primary key columns is not in the output columns.
569    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
570        assert!(IS_REPLICATED);
571        self.pk_indices
572            .iter()
573            .map(|&i| self.output_indices.iter().position(|&j| i == j))
574            .collect()
575    }
576
577    pub fn pk_serde(&self) -> &OrderedRowSerde {
578        &self.pk_serde
579    }
580
581    pub fn vnodes(&self) -> &Arc<Bitmap> {
582        self.distribution.vnodes()
583    }
584
585    pub fn value_indices(&self) -> &Option<Vec<usize>> {
586        &self.value_indices
587    }
588
589    pub fn is_consistent_op(&self) -> bool {
590        matches!(
591            self.op_consistency_level,
592            StateTableOpConsistencyLevel::ConsistentOldValue
593                | StateTableOpConsistencyLevel::LogStoreEnabled
594        )
595    }
596}
597
598impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
599where
600    S: StateStore,
601    SD: ValueRowSerde,
602{
603    /// Create replicated state table from table catalog with output indices
604    pub async fn from_table_catalog_with_output_column_ids(
605        table_catalog: &Table,
606        store: S,
607        vnodes: Option<Arc<Bitmap>>,
608        output_column_ids: Vec<ColumnId>,
609    ) -> Self {
610        Self::from_table_catalog_inner(
611            table_catalog,
612            store,
613            vnodes,
614            StateTableOpConsistencyLevel::Inconsistent,
615            output_column_ids,
616        )
617        .await
618    }
619}
620
621// point get
622impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
623    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
624where
625    S: StateStore,
626    SD: ValueRowSerde,
627{
628    /// Get a single row from state table.
629    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
630        // TODO: avoid clone when `on_key_value_fn` can be non-static
631        let row_serde = self.row_serde.clone();
632        let row = self
633            .get_inner(pk, move |_, value| Ok(row_serde.deserialize(value)?))
634            .await?;
635        match row {
636            Some(row) => {
637                if IS_REPLICATED {
638                    // If the table is replicated, we need to deserialize the row with the output
639                    // indices.
640                    let row = row.project(&self.output_indices);
641                    Ok(Some(row.into_owned_row()))
642                } else {
643                    Ok(Some(OwnedRow::new(row)))
644                }
645            }
646            None => Ok(None),
647        }
648    }
649
650    /// Get a raw encoded row from state table.
651    pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
652        self.get_inner(pk, |_, value| Ok(Bytes::copy_from_slice(value)))
653            .await
654    }
655
656    async fn get_inner<O: Send + 'static>(
657        &self,
658        pk: impl Row,
659        on_key_value_fn: impl risingwave_storage::store::KeyValueFn<O>,
660    ) -> StreamExecutorResult<Option<O>> {
661        assert!(pk.len() <= self.pk_indices.len());
662
663        let serialized_pk =
664            serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
665
666        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
667            Some(serialized_pk.slice(VirtualNode::SIZE..))
668        } else {
669            #[cfg(debug_assertions)]
670            if self.prefix_hint_len != 0 {
671                warn!(
672                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
673                );
674            }
675            None
676        };
677
678        let read_options = ReadOptions {
679            prefix_hint,
680            retention_seconds: self.table_option.retention_seconds,
681            cache_policy: CachePolicy::Fill(Hint::Normal),
682            ..Default::default()
683        };
684
685        self.local_store
686            .on_key_value(serialized_pk, read_options, on_key_value_fn)
687            .await
688            .map_err(Into::into)
689    }
690}
691
692/// A callback struct returned from [`StateTableInner::commit`].
693///
694/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
695/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
696/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
697///
698/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
699/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
700/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
701/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
702/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
703/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
704/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
705/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
706#[must_use]
707pub struct StateTablePostCommit<
708    'a,
709    S,
710    SD = BasicSerde,
711    const IS_REPLICATED: bool = false,
712    const USE_WATERMARK_CACHE: bool = false,
713> where
714    S: StateStore,
715    SD: ValueRowSerde,
716{
717    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
718}
719
720impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
721    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
722where
723    S: StateStore,
724    SD: ValueRowSerde,
725{
726    pub async fn post_yield_barrier(
727        mut self,
728        new_vnodes: Option<Arc<Bitmap>>,
729    ) -> StreamExecutorResult<
730        Option<(
731            (
732                Arc<Bitmap>,
733                Arc<Bitmap>,
734                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
735            ),
736            bool,
737        )>,
738    > {
739        self.inner.on_post_commit = false;
740        Ok(if let Some(new_vnodes) = new_vnodes {
741            let (old_vnodes, cache_may_stale) =
742                self.update_vnode_bitmap(new_vnodes.clone()).await?;
743            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
744        } else {
745            None
746        })
747    }
748
749    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
750        &*self.inner
751    }
752
753    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
754    async fn update_vnode_bitmap(
755        &mut self,
756        new_vnodes: Arc<Bitmap>,
757    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
758        let prev_vnodes = self
759            .inner
760            .local_store
761            .update_vnode_bitmap(new_vnodes.clone())
762            .await?;
763        assert_eq!(
764            &prev_vnodes,
765            self.inner.vnodes(),
766            "state table and state store vnode bitmap mismatches"
767        );
768
769        if self.inner.distribution.is_singleton() {
770            assert_eq!(
771                &new_vnodes,
772                self.inner.vnodes(),
773                "should not update vnode bitmap for singleton table"
774            );
775        }
776        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
777
778        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
779
780        if cache_may_stale {
781            self.inner.pending_watermark = None;
782            if USE_WATERMARK_CACHE {
783                self.inner.watermark_cache.clear();
784            }
785        }
786
787        Ok((
788            self.inner.distribution.update_vnode_bitmap(new_vnodes),
789            cache_may_stale,
790        ))
791    }
792}
793
794// write
795impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
796    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
797where
798    S: StateStore,
799    SD: ValueRowSerde,
800{
801    fn handle_mem_table_error(&self, e: StorageError) {
802        let e = match e.into_inner() {
803            ErrorKind::MemTable(e) => e,
804            _ => unreachable!("should only get memtable error"),
805        };
806        match *e {
807            MemTableError::InconsistentOperation { key, prev, new } => {
808                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
809                panic!(
810                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
811                    self.table_id(),
812                    vnode,
813                    &key,
814                    prev.debug_fmt(&*self.row_serde),
815                    new.debug_fmt(&*self.row_serde),
816                )
817            }
818        }
819    }
820
821    fn serialize_value(&self, value: impl Row) -> Bytes {
822        if let Some(value_indices) = self.value_indices.as_ref() {
823            self.row_serde
824                .serialize(value.project(value_indices))
825                .into()
826        } else {
827            self.row_serde.serialize(value).into()
828        }
829    }
830
831    fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
832        insane_mode_discard_point!();
833        self.local_store
834            .insert(key, value_bytes, None)
835            .unwrap_or_else(|e| self.handle_mem_table_error(e));
836    }
837
838    fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
839        insane_mode_discard_point!();
840        self.local_store
841            .delete(key, value_bytes)
842            .unwrap_or_else(|e| self.handle_mem_table_error(e));
843    }
844
845    fn update_inner(
846        &mut self,
847        key_bytes: TableKey<Bytes>,
848        old_value_bytes: Option<Bytes>,
849        new_value_bytes: Bytes,
850    ) {
851        insane_mode_discard_point!();
852        self.local_store
853            .insert(key_bytes, new_value_bytes, old_value_bytes)
854            .unwrap_or_else(|e| self.handle_mem_table_error(e));
855    }
856
857    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
858    /// the table.
859    pub fn insert(&mut self, value: impl Row) {
860        let pk_indices = &self.pk_indices;
861        let pk = (&value).project(pk_indices);
862        if USE_WATERMARK_CACHE {
863            self.watermark_cache.insert(&pk);
864        }
865
866        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
867        let value_bytes = self.serialize_value(value);
868        self.insert_inner(key_bytes, value_bytes);
869    }
870
871    /// Delete a row from state table. Must provide a full row of old value corresponding to the
872    /// column desc of the table.
873    pub fn delete(&mut self, old_value: impl Row) {
874        let pk_indices = &self.pk_indices;
875        let pk = (&old_value).project(pk_indices);
876        if USE_WATERMARK_CACHE {
877            self.watermark_cache.delete(&pk);
878        }
879
880        let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
881        let value_bytes = self.serialize_value(old_value);
882        self.delete_inner(key_bytes, value_bytes);
883    }
884
885    /// Update a row. The old and new value should have the same pk.
886    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
887        let old_pk = (&old_value).project(self.pk_indices());
888        let new_pk = (&new_value).project(self.pk_indices());
889        debug_assert!(
890            Row::eq(&old_pk, new_pk),
891            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
892            self.table_id
893        );
894
895        let new_key_bytes =
896            serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
897        let old_value_bytes = self.serialize_value(old_value);
898        let new_value_bytes = self.serialize_value(new_value);
899
900        self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
901    }
902
903    /// Write a record into state table. Must have the same schema with the table.
904    pub fn write_record(&mut self, record: Record<impl Row>) {
905        match record {
906            Record::Insert { new_row } => self.insert(new_row),
907            Record::Delete { old_row } => self.delete(old_row),
908            Record::Update { old_row, new_row } => self.update(old_row, new_row),
909        }
910    }
911
912    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
913        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
914    }
915
916    /// Write batch with a `StreamChunk` which should have the same schema with the table.
917    // allow(izip, which use zip instead of zip_eq)
918    #[allow(clippy::disallowed_methods)]
919    pub fn write_chunk(&mut self, chunk: StreamChunk) {
920        let chunk = if IS_REPLICATED {
921            self.fill_non_output_indices(chunk)
922        } else {
923            chunk
924        };
925        let (chunk, op) = chunk.into_parts();
926
927        let vnodes = self
928            .distribution
929            .compute_chunk_vnode(&chunk, &self.pk_indices);
930
931        let values = if let Some(ref value_indices) = self.value_indices {
932            chunk
933                .project(value_indices)
934                .serialize_with(&*self.row_serde)
935        } else {
936            chunk.serialize_with(&*self.row_serde)
937        };
938
939        // TODO(kwannoel): Seems like we are doing vis check twice here.
940        // Once below, when using vis, and once here,
941        // when using vis to set rows empty or not.
942        // If we are to use the vis optimization, we should skip this.
943        let key_chunk = chunk.project(self.pk_indices());
944        let vnode_and_pks = key_chunk
945            .rows_with_holes()
946            .zip_eq_fast(vnodes.iter())
947            .map(|(r, vnode)| {
948                let mut buffer = BytesMut::new();
949                buffer.put_slice(&vnode.to_be_bytes()[..]);
950                if let Some(r) = r {
951                    self.pk_serde.serialize(r, &mut buffer);
952                }
953                (r, buffer.freeze())
954            })
955            .collect_vec();
956
957        if !key_chunk.is_compacted() {
958            for ((op, (key, key_bytes), value), vis) in
959                izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
960            {
961                if vis {
962                    match op {
963                        Op::Insert | Op::UpdateInsert => {
964                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
965                                self.watermark_cache.insert(pk);
966                            }
967                            self.insert_inner(TableKey(key_bytes), value);
968                        }
969                        Op::Delete | Op::UpdateDelete => {
970                            if USE_WATERMARK_CACHE && let Some(ref pk) = key {
971                                self.watermark_cache.delete(pk);
972                            }
973                            self.delete_inner(TableKey(key_bytes), value);
974                        }
975                    }
976                }
977            }
978        } else {
979            for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
980                match op {
981                    Op::Insert | Op::UpdateInsert => {
982                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
983                            self.watermark_cache.insert(pk);
984                        }
985                        self.insert_inner(TableKey(key_bytes), value);
986                    }
987                    Op::Delete | Op::UpdateDelete => {
988                        if USE_WATERMARK_CACHE && let Some(ref pk) = key {
989                            self.watermark_cache.delete(pk);
990                        }
991                        self.delete_inner(TableKey(key_bytes), value);
992                    }
993                }
994            }
995        }
996    }
997
998    /// Update watermark for state cleaning.
999    ///
1000    /// # Arguments
1001    ///
1002    /// * `watermark` - Latest watermark received.
1003    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1004        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1005        self.pending_watermark = Some(watermark);
1006    }
1007
1008    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1009    /// table through `update_watermark` method.
1010    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1011        self.committed_watermark.as_ref()
1012    }
1013
1014    pub async fn commit(
1015        &mut self,
1016        new_epoch: EpochPair,
1017    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1018    {
1019        self.commit_inner(new_epoch, None).await
1020    }
1021
1022    #[cfg(test)]
1023    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1024        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1025    }
1026
1027    pub async fn commit_assert_no_update_vnode_bitmap(
1028        &mut self,
1029        new_epoch: EpochPair,
1030    ) -> StreamExecutorResult<()> {
1031        let post_commit = self.commit_inner(new_epoch, None).await?;
1032        post_commit.post_yield_barrier(None).await?;
1033        Ok(())
1034    }
1035
1036    pub async fn commit_may_switch_consistent_op(
1037        &mut self,
1038        new_epoch: EpochPair,
1039        op_consistency_level: StateTableOpConsistencyLevel,
1040    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1041    {
1042        if self.op_consistency_level != op_consistency_level {
1043            info!(
1044                ?new_epoch,
1045                prev_op_consistency_level = ?self.op_consistency_level,
1046                ?op_consistency_level,
1047                table_id = self.table_id.table_id,
1048                "switch to new op consistency level"
1049            );
1050            self.commit_inner(new_epoch, Some(op_consistency_level))
1051                .await
1052        } else {
1053            self.commit_inner(new_epoch, None).await
1054        }
1055    }
1056
1057    async fn commit_inner(
1058        &mut self,
1059        new_epoch: EpochPair,
1060        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1061    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1062    {
1063        assert!(!self.on_post_commit);
1064        assert_eq!(
1065            self.epoch.expect("should only be called after init").curr,
1066            new_epoch.prev
1067        );
1068        let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
1069            assert_ne!(self.op_consistency_level, new_consistency_level);
1070            self.op_consistency_level = new_consistency_level;
1071            match new_consistency_level {
1072                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
1073                StateTableOpConsistencyLevel::ConsistentOldValue => {
1074                    consistent_old_value_op(self.row_serde.clone(), false)
1075                }
1076                StateTableOpConsistencyLevel::LogStoreEnabled => {
1077                    consistent_old_value_op(self.row_serde.clone(), true)
1078                }
1079            }
1080        });
1081        trace!(
1082            table_id = %self.table_id,
1083            epoch = ?self.epoch,
1084            "commit state table"
1085        );
1086
1087        self.local_store
1088            .flush()
1089            .instrument(tracing::info_span!("state_table_flush"))
1090            .await?;
1091        let table_watermarks = self.commit_pending_watermark();
1092        self.local_store.seal_current_epoch(
1093            new_epoch.curr,
1094            SealCurrentEpochOptions {
1095                table_watermarks,
1096                switch_op_consistency_level,
1097            },
1098        );
1099        self.epoch = Some(new_epoch);
1100
1101        // Refresh watermark cache if it is out of sync.
1102        if USE_WATERMARK_CACHE
1103            && !self.watermark_cache.is_synced()
1104            && let Some(ref watermark) = self.committed_watermark
1105        {
1106            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1107                (Included(once(Some(watermark.clone()))), Unbounded);
1108            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1109            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1110            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1111            // TODO(kwannoel): We can optimize it with:
1112            // 1. Either use `RefCell`.
1113            // 2. Or pass in a direct reference to LocalStateStore,
1114            //    instead of referencing it indirectly from `self`.
1115            //    Similar to how we do for pk_indices.
1116            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1117            {
1118                let mut streams = vec![];
1119                for vnode in self.vnodes().iter_vnodes() {
1120                    let stream = self
1121                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1122                        .await?;
1123                    streams.push(Box::pin(stream));
1124                }
1125                let merged_stream = merge_sort(streams);
1126                pin_mut!(merged_stream);
1127
1128                #[for_await]
1129                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1130                    let keyed_row = entry?;
1131                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1132                    // watermark column should be part of the pk
1133                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1134                        pks.push(pk);
1135                    }
1136                }
1137            }
1138
1139            let mut filler = self.watermark_cache.begin_syncing();
1140            for pk in pks {
1141                filler.insert_unchecked(DefaultOrdered(pk), ());
1142            }
1143            filler.finish();
1144
1145            let n_cache_entries = self.watermark_cache.len();
1146            if n_cache_entries < self.watermark_cache.capacity() {
1147                self.watermark_cache.set_table_row_count(n_cache_entries);
1148            }
1149        }
1150
1151        self.on_post_commit = true;
1152        Ok(StateTablePostCommit { inner: self })
1153    }
1154
1155    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1156    fn commit_pending_watermark(
1157        &mut self,
1158    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1159        let watermark = self.pending_watermark.take()?;
1160        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1161
1162        assert!(
1163            !self.pk_indices().is_empty(),
1164            "see pending watermark on empty pk"
1165        );
1166        let watermark_serializer = {
1167            match self.clean_watermark_index_in_pk {
1168                None => self.pk_serde.index(0),
1169                Some(clean_watermark_index_in_pk) => {
1170                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1171                }
1172            }
1173        };
1174
1175        let watermark_type = match self.clean_watermark_index_in_pk {
1176            None => WatermarkSerdeType::PkPrefix,
1177            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1178                0 => WatermarkSerdeType::PkPrefix,
1179                _ => WatermarkSerdeType::NonPkPrefix,
1180            },
1181        };
1182
1183        let should_clean_watermark = {
1184            {
1185                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1186                    if let Some(key) = self.watermark_cache.lowest_key() {
1187                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1188                    } else {
1189                        // Watermark cache is synced,
1190                        // And there's no key in watermark cache.
1191                        // That implies table is empty.
1192                        // We should not clean watermark.
1193                        false
1194                    }
1195                } else {
1196                    // Either we are not using watermark cache,
1197                    // Or watermark_cache is not synced.
1198                    // In either case we should clean watermark.
1199                    true
1200                }
1201            }
1202        };
1203
1204        let watermark_suffix =
1205            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1206
1207        // Compute Delete Ranges
1208        let seal_watermark = if should_clean_watermark {
1209            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1210                self.vnodes().iter_vnodes().collect_vec()
1211            }, "delete range");
1212
1213            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1214
1215            if order_type.is_ascending() {
1216                Some((
1217                    WatermarkDirection::Ascending,
1218                    VnodeWatermark::new(
1219                        self.vnodes().clone(),
1220                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1221                    ),
1222                    watermark_type,
1223                ))
1224            } else {
1225                Some((
1226                    WatermarkDirection::Descending,
1227                    VnodeWatermark::new(
1228                        self.vnodes().clone(),
1229                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1230                    ),
1231                    watermark_type,
1232                ))
1233            }
1234        } else {
1235            None
1236        };
1237        self.committed_watermark = Some(watermark);
1238
1239        // Clear the watermark cache and force a resync.
1240        // TODO(kwannoel): This can be further optimized:
1241        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1242        //    up to the largest end of delete ranges.
1243        // 2. Mark the cache as not_synced, so we can still refill it later.
1244        // 3. When refilling the cache,
1245        //    we just refill from the largest value of the cache, as the lower bound.
1246        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1247            self.watermark_cache.clear();
1248        }
1249
1250        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1251            (direction, vec![watermark], is_non_pk_prefix)
1252        })
1253    }
1254
1255    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1256        self.local_store.try_flush().await?;
1257        Ok(())
1258    }
1259}
1260
1261pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1262pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1263pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1264
1265// Iterator functions
1266impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1267    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1268where
1269    S: StateStore,
1270    SD: ValueRowSerde,
1271{
1272    /// This function scans rows from the relational table with specific `pk_range` under the same
1273    /// `vnode`.
1274    pub async fn iter_with_vnode(
1275        &self,
1276
1277        // Optional vnode that returns an iterator only over the given range under that vnode.
1278        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1279        // iterate over each vnode that the `StateTableInner` owns.
1280        vnode: VirtualNode,
1281        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1282        prefetch_options: PrefetchOptions,
1283    ) -> StreamExecutorResult<impl RowStream<'_>> {
1284        Ok(deserialize_keyed_row_stream::<'_, ()>(
1285            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1286                .await?,
1287            &*self.row_serde,
1288        )
1289        .map_ok(|(_, row)| row))
1290    }
1291
1292    pub async fn iter_keyed_row_with_vnode(
1293        &self,
1294        vnode: VirtualNode,
1295        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1296        prefetch_options: PrefetchOptions,
1297    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1298        Ok(deserialize_keyed_row_stream(
1299            self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1300                .await?,
1301            &*self.row_serde,
1302        )
1303        .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1304    }
1305
1306    pub async fn iter_with_vnode_and_output_indices(
1307        &self,
1308        vnode: VirtualNode,
1309        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1310        prefetch_options: PrefetchOptions,
1311    ) -> StreamExecutorResult<impl RowStream<'_>> {
1312        assert!(IS_REPLICATED);
1313        let stream = self
1314            .iter_with_vnode(vnode, pk_range, prefetch_options)
1315            .await?;
1316        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1317    }
1318
1319    async fn iter_kv(
1320        &self,
1321        table_key_range: TableKeyRange,
1322        prefix_hint: Option<Bytes>,
1323        prefetch_options: PrefetchOptions,
1324    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1325        let read_options = ReadOptions {
1326            prefix_hint,
1327            retention_seconds: self.table_option.retention_seconds,
1328            prefetch_options,
1329            cache_policy: CachePolicy::Fill(Hint::Normal),
1330        };
1331
1332        Ok(self.local_store.iter(table_key_range, read_options).await?)
1333    }
1334
1335    async fn rev_iter_kv(
1336        &self,
1337        table_key_range: TableKeyRange,
1338        prefix_hint: Option<Bytes>,
1339        prefetch_options: PrefetchOptions,
1340    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
1341        let read_options = ReadOptions {
1342            prefix_hint,
1343            retention_seconds: self.table_option.retention_seconds,
1344            prefetch_options,
1345            cache_policy: CachePolicy::Fill(Hint::Normal),
1346        };
1347
1348        Ok(self
1349            .local_store
1350            .rev_iter(table_key_range, read_options)
1351            .await?)
1352    }
1353
1354    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1355    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1356    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1357    pub async fn iter_with_prefix(
1358        &self,
1359        pk_prefix: impl Row,
1360        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1361        prefetch_options: PrefetchOptions,
1362    ) -> StreamExecutorResult<impl RowStream<'_>> {
1363        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1364            .await?;
1365        Ok(stream.map_ok(|(_, row)| row))
1366    }
1367
1368    /// Get the row from a state table with only 1 row.
1369    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1370        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1371        let stream = self
1372            .iter_with_prefix(row::empty(), sub_range, Default::default())
1373            .await?;
1374        pin_mut!(stream);
1375
1376        if let Some(res) = stream.next().await {
1377            let value = res?.into_owned_row();
1378            assert!(stream.next().await.is_none());
1379            Ok(Some(value))
1380        } else {
1381            Ok(None)
1382        }
1383    }
1384
1385    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1386    ///
1387    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1388    /// which does not matter in the use case.
1389    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1390        Ok(self
1391            .get_from_one_row_table()
1392            .await?
1393            .and_then(|row| row[0].clone()))
1394    }
1395
1396    pub async fn iter_keyed_row_with_prefix(
1397        &self,
1398        pk_prefix: impl Row,
1399        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1400        prefetch_options: PrefetchOptions,
1401    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1402        Ok(
1403            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1404            .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1405        )
1406    }
1407
1408    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1409    pub async fn rev_iter_with_prefix(
1410        &self,
1411        pk_prefix: impl Row,
1412        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1413        prefetch_options: PrefetchOptions,
1414    ) -> StreamExecutorResult<impl RowStream<'_>> {
1415        Ok(
1416            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1417            .await?.map_ok(|(_, row)| row),
1418        )
1419    }
1420
1421    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice>(
1422        &self,
1423        pk_prefix: impl Row,
1424        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1425        prefetch_options: PrefetchOptions,
1426    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1427        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1428        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1429
1430        // We assume that all usages of iterating the state table only access a single vnode.
1431        // If this assertion fails, then something must be wrong with the operator implementation or
1432        // the distribution derivation from the optimizer.
1433        let vnode = self.compute_prefix_vnode(&pk_prefix);
1434
1435        // Construct prefix hint for prefix bloom filter.
1436        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1437        if self.prefix_hint_len != 0 {
1438            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1439        }
1440        let prefix_hint = {
1441            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1442                None
1443            } else {
1444                let encoded_prefix_len = self
1445                    .pk_serde
1446                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1447
1448                Some(Bytes::copy_from_slice(
1449                    &encoded_prefix[..encoded_prefix_len],
1450                ))
1451            }
1452        };
1453
1454        trace!(
1455            table_id = %self.table_id(),
1456            ?prefix_hint, ?pk_prefix,
1457            ?pk_prefix_indices,
1458            iter_direction = if REVERSE { "reverse" } else { "forward" },
1459            "storage_iter_with_prefix"
1460        );
1461
1462        let memcomparable_range =
1463            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1464
1465        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1466
1467        Ok(if REVERSE {
1468            futures::future::Either::Left(deserialize_keyed_row_stream(
1469                self.rev_iter_kv(
1470                    memcomparable_range_with_vnode,
1471                    prefix_hint,
1472                    prefetch_options,
1473                )
1474                .await?,
1475                &*self.row_serde,
1476            ))
1477        } else {
1478            futures::future::Either::Right(deserialize_keyed_row_stream(
1479                self.iter_kv(
1480                    memcomparable_range_with_vnode,
1481                    prefix_hint,
1482                    prefetch_options,
1483                )
1484                .await?,
1485                &*self.row_serde,
1486            ))
1487        })
1488    }
1489
1490    /// This function scans raw key-values from the relational table with specific `pk_range` under
1491    /// the same `vnode`.
1492    async fn iter_kv_with_pk_range(
1493        &self,
1494        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1495        // Optional vnode that returns an iterator only over the given range under that vnode.
1496        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1497        // iterate over each vnode that the `StateTableInner` owns.
1498        vnode: VirtualNode,
1499        prefetch_options: PrefetchOptions,
1500    ) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
1501        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1502        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1503
1504        // TODO: provide a trace of useful params.
1505        self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
1506            .await
1507    }
1508
1509    #[cfg(test)]
1510    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1511        &self.watermark_cache
1512    }
1513}
1514
1515impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1516    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1517where
1518    S: StateStore,
1519    SD: ValueRowSerde,
1520{
1521    pub async fn iter_log_with_vnode(
1522        &self,
1523        vnode: VirtualNode,
1524        epoch_range: (u64, u64),
1525        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1526    ) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
1527        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1528        let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
1529        Ok(deserialize_log_stream(
1530            self.store
1531                .iter_log(
1532                    epoch_range,
1533                    memcomparable_range_with_vnode,
1534                    ReadLogOptions {
1535                        table_id: self.table_id,
1536                    },
1537                )
1538                .await?,
1539            &*self.row_serde,
1540        )
1541        .map_err(Into::into))
1542    }
1543}
1544
1545fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1546    iter: impl StateStoreIter + 'a,
1547    deserializer: &'a impl ValueRowSerde,
1548) -> impl PkRowStream<'a, K> {
1549    iter.into_stream(move |(key, value)| {
1550        Ok((
1551            K::copy_from_slice(key.user_key.table_key.as_ref()),
1552            deserializer.deserialize(value).map(OwnedRow::new)?,
1553        ))
1554    })
1555    .map_err(Into::into)
1556}
1557
1558pub fn prefix_range_to_memcomparable(
1559    pk_serde: &OrderedRowSerde,
1560    range: &(Bound<impl Row>, Bound<impl Row>),
1561) -> (Bound<Bytes>, Bound<Bytes>) {
1562    (
1563        start_range_to_memcomparable(pk_serde, &range.0),
1564        end_range_to_memcomparable(pk_serde, &range.1, None),
1565    )
1566}
1567
1568fn prefix_and_sub_range_to_memcomparable(
1569    pk_serde: &OrderedRowSerde,
1570    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1571    pk_prefix: impl Row,
1572) -> (Bound<Bytes>, Bound<Bytes>) {
1573    let (range_start, range_end) = sub_range;
1574    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1575    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1576    let start_range = match range_start {
1577        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1578        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1579        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1580    };
1581    let end_range = match range_end {
1582        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1583        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1584        Unbounded => Unbounded,
1585    };
1586    (
1587        start_range_to_memcomparable(pk_serde, &start_range),
1588        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1589    )
1590}
1591
1592fn start_range_to_memcomparable<R: Row>(
1593    pk_serde: &OrderedRowSerde,
1594    bound: &Bound<R>,
1595) -> Bound<Bytes> {
1596    let serialize_pk_prefix = |pk_prefix: &R| {
1597        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1598        serialize_pk(pk_prefix, &prefix_serializer)
1599    };
1600    match bound {
1601        Unbounded => Unbounded,
1602        Included(r) => {
1603            let serialized = serialize_pk_prefix(r);
1604
1605            Included(serialized)
1606        }
1607        Excluded(r) => {
1608            let serialized = serialize_pk_prefix(r);
1609
1610            start_bound_of_excluded_prefix(&serialized)
1611        }
1612    }
1613}
1614
1615fn end_range_to_memcomparable<R: Row>(
1616    pk_serde: &OrderedRowSerde,
1617    bound: &Bound<R>,
1618    serialized_pk_prefix: Option<Bytes>,
1619) -> Bound<Bytes> {
1620    let serialize_pk_prefix = |pk_prefix: &R| {
1621        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1622        serialize_pk(pk_prefix, &prefix_serializer)
1623    };
1624    match bound {
1625        Unbounded => match serialized_pk_prefix {
1626            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1627            None => Unbounded,
1628        },
1629        Included(r) => {
1630            let serialized = serialize_pk_prefix(r);
1631
1632            end_bound_of_prefix(&serialized)
1633        }
1634        Excluded(r) => {
1635            let serialized = serialize_pk_prefix(r);
1636            Excluded(serialized)
1637        }
1638    }
1639}
1640
1641fn fill_non_output_indices(
1642    i2o_mapping: &ColIndexMapping,
1643    data_types: &[DataType],
1644    chunk: StreamChunk,
1645) -> StreamChunk {
1646    let cardinality = chunk.cardinality();
1647    let (ops, columns, vis) = chunk.into_inner();
1648    let mut full_columns = Vec::with_capacity(data_types.len());
1649    for (i, data_type) in data_types.iter().enumerate() {
1650        if let Some(j) = i2o_mapping.try_map(i) {
1651            full_columns.push(columns[j].clone());
1652        } else {
1653            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1654            column_builder.append_n_null(cardinality);
1655            let column: ArrayRef = column_builder.finish().into();
1656            full_columns.push(column)
1657        }
1658    }
1659    let data_chunk = DataChunk::new(full_columns, vis);
1660    StreamChunk::from_parts(ops, data_chunk)
1661}
1662
1663#[cfg(test)]
1664mod tests {
1665    use std::fmt::Debug;
1666
1667    use expect_test::{Expect, expect};
1668
1669    use super::*;
1670
1671    fn check(actual: impl Debug, expect: Expect) {
1672        let actual = format!("{:#?}", actual);
1673        expect.assert_eq(&actual);
1674    }
1675
1676    #[test]
1677    fn test_fill_non_output_indices() {
1678        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1679        let replicated_chunk = [OwnedRow::new(vec![
1680            Some(222_i32.into()),
1681            Some(2_i32.into()),
1682        ])];
1683        let replicated_chunk = StreamChunk::from_parts(
1684            vec![Op::Insert],
1685            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1686        );
1687        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1688        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1689        check(
1690            filled_chunk,
1691            expect![[r#"
1692            StreamChunk { cardinality: 1, capacity: 1, data:
1693            +---+---+---+-----+
1694            | + | 2 |   | 222 |
1695            +---+---+---+-----+
1696             }"#]],
1697        );
1698    }
1699}