risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73/// Mostly watermark operators will have inserts (append-only).
74/// So this number should not need to be very large.
75/// But we may want to improve this choice in the future.
76const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78/// This macro is used to mark a point where we want to randomly discard the operation and early
79/// return, only in insane mode.
80macro_rules! insane_mode_discard_point {
81    () => {{
82        use rand::Rng;
83        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84            return;
85        }
86    }};
87}
88
89/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
90/// row-based encoding.
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    row_store: StateTableRowStore<S::Local, SD>,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Indices of primary key.
116    /// Note that the index is based on the all columns of the table, instead of the output ones.
117    // FIXME: revisit constructions and usages.
118    pk_indices: Vec<usize>,
119
120    /// Distribution of the state table.
121    ///
122    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
123    /// executor. The table will also check whether the written rows
124    /// conform to this partition.
125    distribution: TableDistribution,
126
127    prefix_hint_len: usize,
128
129    value_indices: Option<Vec<usize>>,
130
131    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
132    pending_watermark: Option<ScalarImpl>,
133    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
134    committed_watermark: Option<ScalarImpl>,
135    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
136    watermark_cache: StateTableWatermarkCache,
137
138    /// Data Types
139    /// We will need to use to build data chunks from state table rows.
140    data_types: Vec<DataType>,
141
142    /// "i" here refers to the base `state_table`'s actual schema.
143    /// "o" here refers to the replicated state table's output schema.
144    /// This mapping is used to reconstruct a row being written from replicated state table.
145    /// Such that the schema of this row will match the full schema of the base state table.
146    /// It is only applicable for replication.
147    i2o_mapping: ColIndexMapping,
148
149    /// Output indices
150    /// Used for:
151    /// 1. Computing `output_value_indices` to ser/de replicated rows.
152    /// 2. Computing output pk indices to used them for backfill state.
153    pub output_indices: Vec<usize>,
154
155    op_consistency_level: StateTableOpConsistencyLevel,
156
157    clean_watermark_index_in_pk: Option<i32>,
158
159    /// Flag to indicate whether the state table has called `commit`, but has not called
160    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
161    on_post_commit: bool,
162}
163
164/// `StateTable` will use `BasicSerde` as default
165pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
167/// Used for `ArrangementBackfill` executor.
168pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169/// `WatermarkCacheStateTable` caches the watermark column.
170/// It will reduce state cleaning overhead.
171pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175// initialize
176impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179    S: StateStore,
180    SD: ValueRowSerde,
181{
182    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
183    /// and otherwise, deadlock can be likely to happen.
184    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185        self.row_store
186            .init(epoch, self.distribution.vnodes())
187            .await?;
188        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189        Ok(())
190    }
191
192    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193        self.store
194            .try_wait_epoch(
195                HummockReadEpoch::Committed(prev_epoch),
196                TryWaitEpochOptions {
197                    table_id: self.table_id,
198                },
199            )
200            .await
201    }
202
203    pub fn state_store(&self) -> &S {
204        &self.store
205    }
206}
207
208fn consistent_old_value_op(
209    row_serde: Arc<impl ValueRowSerde>,
210    is_log_store: bool,
211) -> OpConsistencyLevel {
212    OpConsistencyLevel::ConsistentOldValue {
213        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214            if first == second {
215                return true;
216            }
217            let first = match row_serde.deserialize(first) {
218                Ok(rows) => rows,
219                Err(e) => {
220                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221                    return false;
222                }
223            };
224            let second = match row_serde.deserialize(second) {
225                Ok(rows) => rows,
226                Err(e) => {
227                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228                    return false;
229                }
230            };
231            if first != second {
232                error!(first = ?first, second = ?second, "sanity check fail");
233                false
234            } else {
235                true
236            }
237        }),
238        is_log_store,
239    }
240}
241
242macro_rules! dispatch_value_indices {
243    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244        if let Some(value_indices) = $value_indices {
245            $(
246                let $row_var_name = $row_var_name.project(value_indices);
247            )+
248            $body
249        } else {
250            $body
251        }
252    };
253}
254
255/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
256/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
257/// provides method to read (`get` and `iter`) over serialized key (or key range)
258/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
259struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260    state_store: LS,
261    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263    table_id: TableId,
264    table_option: TableOption,
265    row_serde: Arc<SD>,
266    // should be only used for debugging in panic message of handle_mem_table_error
267    pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272        if let Some(rows) = &mut self.all_rows {
273            rows.clear();
274            let start_time = Instant::now();
275            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276                let state_store = &self.state_store;
277                let retention_seconds = self.table_option.retention_seconds;
278                let row_serde = &self.row_serde;
279                async move {
280                    let mut rows = BTreeMap::new();
281                    let memcomparable_range_with_vnode =
282                        prefixed_range_with_vnode::<Bytes>(.., vnode);
283                    // TODO: set read options
284                    let stream = deserialize_keyed_row_stream::<Bytes>(
285                        state_store
286                            .iter(
287                                memcomparable_range_with_vnode,
288                                ReadOptions {
289                                    prefix_hint: None,
290                                    prefetch_options: Default::default(),
291                                    cache_policy: Default::default(),
292                                    retention_seconds,
293                                },
294                            )
295                            .await?,
296                        &**row_serde,
297                    );
298                    pin_mut!(stream);
299                    while let Some((encoded_key, row)) = stream.try_next().await? {
300                        let key = TableKey(encoded_key);
301                        let (iter_vnode, key) = key.split_vnode_bytes();
302                        assert_eq!(vnode, iter_vnode);
303                        rows.try_insert(key, row).expect("non-duplicated");
304                    }
305                    Ok((vnode, rows)) as StreamExecutorResult<_>
306                }
307            }))
308            .await?
309            .into_iter()
310            .collect();
311            // avoid flooding e2e-test log
312            if !cfg!(debug_assertions) {
313                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314            }
315        }
316        Ok(())
317    }
318
319    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320        self.state_store.init(InitOptions::new(epoch)).await?;
321        self.may_reload_all_rows(vnode_bitmap).await
322    }
323
324    async fn update_vnode_bitmap(
325        &mut self,
326        vnodes: Arc<Bitmap>,
327    ) -> StreamExecutorResult<Arc<Bitmap>> {
328        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329        self.may_reload_all_rows(&vnodes).await?;
330        Ok(prev_vnodes)
331    }
332
333    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334        self.state_store.try_flush().await?;
335        Ok(())
336    }
337
338    async fn seal_current_epoch(
339        &mut self,
340        next_epoch: u64,
341        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343    ) -> StreamExecutorResult<()> {
344        if let Some((direction, watermarks, serde_type)) = &table_watermarks
345            && let Some(rows) = &mut self.all_rows
346        {
347            match serde_type {
348                WatermarkSerdeType::PkPrefix => {
349                    for vnode_watermark in watermarks {
350                        match direction {
351                            WatermarkDirection::Ascending => {
352                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
354                                    // split_off returns everything after the given key, including the key.
355                                    *rows = rows.split_off(vnode_watermark.watermark());
356                                }
357                            }
358                            WatermarkDirection::Descending => {
359                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
360                                let split_off_key = next_key(vnode_watermark.watermark());
361                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
363                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
364                                    // (..Include(vnode_watermark.watermark()))
365                                    rows.split_off(split_off_key.as_slice());
366                                }
367                            }
368                        }
369                    }
370                }
371                WatermarkSerdeType::NonPkPrefix => {
372                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373                    self.all_rows = None;
374                }
375            }
376        }
377        self.state_store
378            .flush()
379            .instrument(tracing::info_span!("state_table_flush"))
380            .await?;
381        let switch_op_consistency_level =
382            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384                StateTableOpConsistencyLevel::ConsistentOldValue => {
385                    consistent_old_value_op(self.row_serde.clone(), false)
386                }
387                StateTableOpConsistencyLevel::LogStoreEnabled => {
388                    consistent_old_value_op(self.row_serde.clone(), true)
389                }
390            });
391        self.state_store.seal_current_epoch(
392            next_epoch,
393            SealCurrentEpochOptions {
394                table_watermarks,
395                switch_op_consistency_level,
396            },
397        );
398        Ok(())
399    }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404    /// Op is inconsistent
405    Inconsistent,
406    /// Op is consistent.
407    /// - Insert op should ensure that the key does not exist previously
408    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
409    ConsistentOldValue,
410    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
411    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
412    LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416    'a,
417    S,
418    SD,
419    const IS_REPLICATED: bool,
420    const USE_WATERMARK_CACHE: bool,
421    PreloadAllRow,
422> {
423    table_catalog: &'a Table,
424    store: S,
425    vnodes: Option<Arc<Bitmap>>,
426    op_consistency_level: Option<StateTableOpConsistencyLevel>,
427    output_column_ids: Option<Vec<ColumnId>>,
428    preload_all_rows: PreloadAllRow,
429
430    _serde: PhantomData<SD>,
431}
432
433impl<
434    'a,
435    S: StateStore,
436    SD: ValueRowSerde,
437    const IS_REPLICATED: bool,
438    const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442        Self {
443            table_catalog,
444            store,
445            vnodes,
446            op_consistency_level: None,
447            output_column_ids: None,
448            preload_all_rows: (),
449            _serde: Default::default(),
450        }
451    }
452
453    fn with_preload_all_rows(
454        self,
455        preload_all_rows: bool,
456    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457        StateTableBuilder {
458            table_catalog: self.table_catalog,
459            store: self.store,
460            vnodes: self.vnodes,
461            op_consistency_level: self.op_consistency_level,
462            output_column_ids: self.output_column_ids,
463            preload_all_rows,
464            _serde: Default::default(),
465        }
466    }
467
468    pub fn enable_preload_all_rows_by_config(
469        self,
470        config: &StreamingConfig,
471    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472        let developer = &config.developer;
473        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474            !developer
475                .mem_preload_state_table_ids_blacklist
476                .contains(&self.table_catalog.id)
477        } else {
478            developer
479                .mem_preload_state_table_ids_whitelist
480                .contains(&self.table_catalog.id)
481        };
482        self.with_preload_all_rows(preload_all_rows)
483    }
484
485    pub fn forbid_preload_all_rows(
486        self,
487    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488        self.with_preload_all_rows(false)
489    }
490}
491
492impl<
493    'a,
494    S: StateStore,
495    SD: ValueRowSerde,
496    const IS_REPLICATED: bool,
497    const USE_WATERMARK_CACHE: bool,
498    PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501    pub fn with_op_consistency_level(
502        mut self,
503        op_consistency_level: StateTableOpConsistencyLevel,
504    ) -> Self {
505        self.op_consistency_level = Some(op_consistency_level);
506        self
507    }
508
509    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510        self.output_column_ids = Some(output_column_ids);
511        self
512    }
513}
514
515impl<
516    'a,
517    S: StateStore,
518    SD: ValueRowSerde,
519    const IS_REPLICATED: bool,
520    const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524        let mut preload_all_rows = self.preload_all_rows;
525        if preload_all_rows
526            && let Err(e) =
527                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528        {
529            warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530            preload_all_rows = false;
531        }
532        StateTableInner::from_table_catalog_inner(
533            self.table_catalog,
534            self.store,
535            self.vnodes,
536            self.op_consistency_level
537                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538            self.output_column_ids.unwrap_or_default(),
539            preload_all_rows,
540        )
541        .await
542    }
543}
544
545// initialize
546// FIXME(kwannoel): Enforce that none of the constructors here
547// should be used by replicated state table.
548// Apart from from_table_catalog_inner.
549impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552    S: StateStore,
553    SD: ValueRowSerde,
554{
555    /// Create state table from table catalog and store.
556    ///
557    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
558    #[cfg(any(test, feature = "test"))]
559    pub async fn from_table_catalog(
560        table_catalog: &Table,
561        store: S,
562        vnodes: Option<Arc<Bitmap>>,
563    ) -> Self {
564        StateTableBuilder::new(table_catalog, store, vnodes)
565            .forbid_preload_all_rows()
566            .build()
567            .await
568    }
569
570    /// Create state table from table catalog and store with sanity check disabled.
571    pub async fn from_table_catalog_inconsistent_op(
572        table_catalog: &Table,
573        store: S,
574        vnodes: Option<Arc<Bitmap>>,
575    ) -> Self {
576        StateTableBuilder::new(table_catalog, store, vnodes)
577            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
578            .forbid_preload_all_rows()
579            .build()
580            .await
581    }
582
583    /// Create state table from table catalog and store.
584    async fn from_table_catalog_inner(
585        table_catalog: &Table,
586        store: S,
587        vnodes: Option<Arc<Bitmap>>,
588        op_consistency_level: StateTableOpConsistencyLevel,
589        output_column_ids: Vec<ColumnId>,
590        preload_all_rows: bool,
591    ) -> Self {
592        let table_id = TableId::new(table_catalog.id);
593        let table_columns: Vec<ColumnDesc> = table_catalog
594            .columns
595            .iter()
596            .map(|col| col.column_desc.as_ref().unwrap().into())
597            .collect();
598        let data_types: Vec<DataType> = table_catalog
599            .columns
600            .iter()
601            .map(|col| {
602                col.get_column_desc()
603                    .unwrap()
604                    .get_column_type()
605                    .unwrap()
606                    .into()
607            })
608            .collect();
609        let order_types: Vec<OrderType> = table_catalog
610            .pk
611            .iter()
612            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
613            .collect();
614        let dist_key_indices: Vec<usize> = table_catalog
615            .distribution_key
616            .iter()
617            .map(|dist_index| *dist_index as usize)
618            .collect();
619
620        let pk_indices = table_catalog
621            .pk
622            .iter()
623            .map(|col_order| col_order.column_index as usize)
624            .collect_vec();
625
626        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
627        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
628            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
629        } else {
630            table_catalog
631                .get_dist_key_in_pk()
632                .iter()
633                .map(|idx| *idx as usize)
634                .collect()
635        };
636
637        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
638            let vnode_col_idx = *idx as usize;
639            pk_indices.iter().position(|&i| vnode_col_idx == i)
640        });
641
642        let distribution =
643            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
644        assert_eq!(
645            distribution.vnode_count(),
646            table_catalog.vnode_count(),
647            "vnode count mismatch, scanning table {} under wrong distribution?",
648            table_catalog.name,
649        );
650
651        let pk_data_types = pk_indices
652            .iter()
653            .map(|i| table_columns[*i].data_type.clone())
654            .collect();
655        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
656
657        let input_value_indices = table_catalog
658            .value_indices
659            .iter()
660            .map(|val| *val as usize)
661            .collect_vec();
662
663        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
664
665        // if value_indices is the no shuffle full columns.
666        let value_indices = match input_value_indices.len() == table_columns.len()
667            && input_value_indices == no_shuffle_value_indices
668        {
669            true => None,
670            false => Some(input_value_indices),
671        };
672        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
673
674        let row_serde = Arc::new(SD::new(
675            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
676            Arc::from(table_columns.clone().into_boxed_slice()),
677        ));
678
679        let state_table_op_consistency_level = op_consistency_level;
680        let op_consistency_level = match op_consistency_level {
681            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
682            StateTableOpConsistencyLevel::ConsistentOldValue => {
683                consistent_old_value_op(row_serde.clone(), false)
684            }
685            StateTableOpConsistencyLevel::LogStoreEnabled => {
686                consistent_old_value_op(row_serde.clone(), true)
687            }
688        };
689
690        let table_option = TableOption::new(table_catalog.retention_seconds);
691        let new_local_options = if IS_REPLICATED {
692            NewLocalOptions::new_replicated(
693                table_id,
694                op_consistency_level,
695                table_option,
696                distribution.vnodes().clone(),
697            )
698        } else {
699            NewLocalOptions::new(
700                table_id,
701                op_consistency_level,
702                table_option,
703                distribution.vnodes().clone(),
704                true,
705            )
706        };
707        let local_state_store = store.new_local(new_local_options).await;
708
709        // If state table has versioning, that means it supports
710        // Schema change. In that case, the row encoding should be column aware as well.
711        // Otherwise both will be false.
712        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
713        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
714        assert_eq!(
715            table_catalog.version.is_some(),
716            row_serde.kind().is_column_aware()
717        );
718
719        // Restore persisted table watermark.
720        let watermark_serde = if pk_indices.is_empty() {
721            None
722        } else {
723            match table_catalog.clean_watermark_index_in_pk {
724                None => Some(pk_serde.index(0)),
725                Some(clean_watermark_index_in_pk) => {
726                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
727                }
728            }
729        };
730        let max_watermark_of_vnodes = distribution
731            .vnodes()
732            .iter_vnodes()
733            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
734            .max();
735        let committed_watermark = if let Some(deser) = watermark_serde
736            && let Some(max_watermark) = max_watermark_of_vnodes
737        {
738            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
739                assert!(row.len() == 1);
740                row[0].clone()
741            });
742            if deserialized.is_none() {
743                tracing::error!(
744                    vnodes = ?distribution.vnodes(),
745                    watermark = ?max_watermark,
746                    "Failed to deserialize persisted watermark from state store.",
747                );
748            }
749            deserialized
750        } else {
751            None
752        };
753
754        let watermark_cache = if USE_WATERMARK_CACHE {
755            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
756        } else {
757            StateTableWatermarkCache::new(0)
758        };
759
760        // Get info for replicated state table.
761        let output_column_ids_to_input_idx = output_column_ids
762            .iter()
763            .enumerate()
764            .map(|(pos, id)| (*id, pos))
765            .collect::<HashMap<_, _>>();
766
767        // Compute column descriptions
768        let columns: Vec<ColumnDesc> = table_catalog
769            .columns
770            .iter()
771            .map(|c| c.column_desc.as_ref().unwrap().into())
772            .collect_vec();
773
774        // Compute i2o mapping
775        // Note that this can be a partial mapping, since we use the i2o mapping to get
776        // any 1 of the output columns, and use that to fill the input column.
777        let mut i2o_mapping = vec![None; columns.len()];
778        for (i, column) in columns.iter().enumerate() {
779            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
780                i2o_mapping[i] = Some(*pos);
781            }
782        }
783        // We can prune any duplicate column indices
784        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
785
786        // Compute output indices
787        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
788
789        Self {
790            table_id,
791            row_store: StateTableRowStore {
792                all_rows: preload_all_rows.then(HashMap::new),
793                table_option,
794                state_store: local_state_store,
795                row_serde,
796                pk_serde: pk_serde.clone(),
797                table_id,
798            },
799            store,
800            epoch: None,
801            pk_serde,
802            pk_indices,
803            distribution,
804            prefix_hint_len,
805            value_indices,
806            pending_watermark: None,
807            committed_watermark,
808            watermark_cache,
809            data_types,
810            output_indices,
811            i2o_mapping,
812            op_consistency_level: state_table_op_consistency_level,
813            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
814            on_post_commit: false,
815        }
816    }
817
818    pub fn get_data_types(&self) -> &[DataType] {
819        &self.data_types
820    }
821
822    pub fn table_id(&self) -> u32 {
823        self.table_id.table_id
824    }
825
826    /// Get the vnode value with given (prefix of) primary key
827    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
828        self.distribution
829            .try_compute_vnode_by_pk_prefix(pk_prefix)
830            .expect("For streaming, the given prefix must be enough to calculate the vnode")
831    }
832
833    /// Get the vnode value of the given primary key
834    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
835        self.distribution.compute_vnode_by_pk(pk)
836    }
837
838    /// NOTE(kwannoel): This is used by backfill.
839    /// We want to check pk indices of upstream table.
840    pub fn pk_indices(&self) -> &[usize] {
841        &self.pk_indices
842    }
843
844    /// Get the indices of the primary key columns in the output columns.
845    ///
846    /// Returns `None` if any of the primary key columns is not in the output columns.
847    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
848        assert!(IS_REPLICATED);
849        self.pk_indices
850            .iter()
851            .map(|&i| self.output_indices.iter().position(|&j| i == j))
852            .collect()
853    }
854
855    pub fn pk_serde(&self) -> &OrderedRowSerde {
856        &self.pk_serde
857    }
858
859    pub fn vnodes(&self) -> &Arc<Bitmap> {
860        self.distribution.vnodes()
861    }
862
863    pub fn value_indices(&self) -> &Option<Vec<usize>> {
864        &self.value_indices
865    }
866
867    pub fn is_consistent_op(&self) -> bool {
868        matches!(
869            self.op_consistency_level,
870            StateTableOpConsistencyLevel::ConsistentOldValue
871                | StateTableOpConsistencyLevel::LogStoreEnabled
872        )
873    }
874}
875
876impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
877where
878    S: StateStore,
879    SD: ValueRowSerde,
880{
881    /// Create replicated state table from table catalog with output indices
882    pub async fn new_replicated(
883        table_catalog: &Table,
884        store: S,
885        vnodes: Option<Arc<Bitmap>>,
886        output_column_ids: Vec<ColumnId>,
887    ) -> Self {
888        // TODO: can it be ConsistentOldValue?
889        // TODO: may enable preload_all_rows
890        StateTableBuilder::new(table_catalog, store, vnodes)
891            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
892            .with_output_column_ids(output_column_ids)
893            .forbid_preload_all_rows()
894            .build()
895            .await
896    }
897}
898
899// point get
900impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
901    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
902where
903    S: StateStore,
904    SD: ValueRowSerde,
905{
906    /// Get a single row from state table.
907    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
908        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
909        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
910        match row {
911            Some(row) => {
912                if IS_REPLICATED {
913                    // If the table is replicated, we need to deserialize the row with the output
914                    // indices.
915                    let row = row.project(&self.output_indices);
916                    Ok(Some(row.into_owned_row()))
917                } else {
918                    Ok(Some(row))
919                }
920            }
921            None => Ok(None),
922        }
923    }
924
925    /// Get a raw encoded row from state table.
926    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
927        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
928        self.row_store.exists(serialized_pk, prefix_hint).await
929    }
930
931    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
932        assert!(pk.len() <= self.pk_indices.len());
933        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
934    }
935
936    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
937        let serialized_pk = self.serialize_pk(&pk);
938        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
939            Some(serialized_pk.slice(VirtualNode::SIZE..))
940        } else {
941            #[cfg(debug_assertions)]
942            if self.prefix_hint_len != 0 {
943                warn!(
944                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
945                );
946            }
947            None
948        };
949        (serialized_pk, prefix_hint)
950    }
951}
952
953impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
954    async fn get(
955        &self,
956        key_bytes: TableKey<Bytes>,
957        prefix_hint: Option<Bytes>,
958    ) -> StreamExecutorResult<Option<OwnedRow>> {
959        if let Some(rows) = &self.all_rows {
960            let (vnode, key) = key_bytes.split_vnode();
961            return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
962        }
963        let read_options = ReadOptions {
964            prefix_hint,
965            retention_seconds: self.table_option.retention_seconds,
966            cache_policy: CachePolicy::Fill(Hint::Normal),
967            ..Default::default()
968        };
969
970        // TODO: avoid clone when `on_key_value_fn` can be non-static
971        let row_serde = self.row_serde.clone();
972
973        self.state_store
974            .on_key_value(key_bytes, read_options, move |_, value| {
975                let row = row_serde.deserialize(value)?;
976                Ok(OwnedRow::new(row))
977            })
978            .await
979            .map_err(Into::into)
980    }
981
982    async fn exists(
983        &self,
984        key_bytes: TableKey<Bytes>,
985        prefix_hint: Option<Bytes>,
986    ) -> StreamExecutorResult<bool> {
987        if let Some(rows) = &self.all_rows {
988            let (vnode, key) = key_bytes.split_vnode();
989            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
990        }
991        let read_options = ReadOptions {
992            prefix_hint,
993            retention_seconds: self.table_option.retention_seconds,
994            cache_policy: CachePolicy::Fill(Hint::Normal),
995            ..Default::default()
996        };
997        let result = self
998            .state_store
999            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1000            .await?;
1001        Ok(result.is_some())
1002    }
1003}
1004
1005/// A callback struct returned from [`StateTableInner::commit`].
1006///
1007/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1008/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1009/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1010///
1011/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1012/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1013/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1014/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1015/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1016/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1017/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1018/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1019#[must_use]
1020pub struct StateTablePostCommit<
1021    'a,
1022    S,
1023    SD = BasicSerde,
1024    const IS_REPLICATED: bool = false,
1025    const USE_WATERMARK_CACHE: bool = false,
1026> where
1027    S: StateStore,
1028    SD: ValueRowSerde,
1029{
1030    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1031}
1032
1033impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1034    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1035where
1036    S: StateStore,
1037    SD: ValueRowSerde,
1038{
1039    pub async fn post_yield_barrier(
1040        mut self,
1041        new_vnodes: Option<Arc<Bitmap>>,
1042    ) -> StreamExecutorResult<
1043        Option<(
1044            (
1045                Arc<Bitmap>,
1046                Arc<Bitmap>,
1047                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1048            ),
1049            bool,
1050        )>,
1051    > {
1052        self.inner.on_post_commit = false;
1053        Ok(if let Some(new_vnodes) = new_vnodes {
1054            let (old_vnodes, cache_may_stale) =
1055                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1056            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1057        } else {
1058            None
1059        })
1060    }
1061
1062    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1063        &*self.inner
1064    }
1065
1066    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1067    async fn update_vnode_bitmap(
1068        &mut self,
1069        new_vnodes: Arc<Bitmap>,
1070    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1071        let prev_vnodes = self
1072            .inner
1073            .row_store
1074            .update_vnode_bitmap(new_vnodes.clone())
1075            .await?;
1076        assert_eq!(
1077            &prev_vnodes,
1078            self.inner.vnodes(),
1079            "state table and state store vnode bitmap mismatches"
1080        );
1081
1082        if self.inner.distribution.is_singleton() {
1083            assert_eq!(
1084                &new_vnodes,
1085                self.inner.vnodes(),
1086                "should not update vnode bitmap for singleton table"
1087            );
1088        }
1089        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1090
1091        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1092
1093        if cache_may_stale {
1094            self.inner.pending_watermark = None;
1095            if USE_WATERMARK_CACHE {
1096                self.inner.watermark_cache.clear();
1097            }
1098        }
1099
1100        Ok((
1101            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1102            cache_may_stale,
1103        ))
1104    }
1105}
1106
1107// write
1108impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1109    fn handle_mem_table_error(&self, e: StorageError) {
1110        let e = match e.into_inner() {
1111            ErrorKind::MemTable(e) => e,
1112            _ => unreachable!("should only get memtable error"),
1113        };
1114        match *e {
1115            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1116                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1117                panic!(
1118                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1119                    self.table_id,
1120                    vnode,
1121                    &key,
1122                    prev.debug_fmt(&*self.row_serde),
1123                    new.debug_fmt(&*self.row_serde),
1124                )
1125            }
1126        }
1127    }
1128
1129    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1130        insane_mode_discard_point!();
1131        let value_bytes = self.row_serde.serialize(&value).into();
1132        if let Some(rows) = &mut self.all_rows {
1133            let (vnode, key) = key.split_vnode_bytes();
1134            rows.get_mut(&vnode)
1135                .expect("covered vnode")
1136                .insert(key, value.into_owned_row());
1137        }
1138        self.state_store
1139            .insert(key, value_bytes, None)
1140            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1141    }
1142
1143    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1144        insane_mode_discard_point!();
1145        let value_bytes = self.row_serde.serialize(value).into();
1146        if let Some(rows) = &mut self.all_rows {
1147            let (vnode, key) = key.split_vnode();
1148            rows.get_mut(&vnode).expect("covered vnode").remove(key);
1149        }
1150        self.state_store
1151            .delete(key, value_bytes)
1152            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1153    }
1154
1155    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1156        insane_mode_discard_point!();
1157        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1158        let old_value_bytes = self.row_serde.serialize(old_value).into();
1159        if let Some(rows) = &mut self.all_rows {
1160            let (vnode, key) = key_bytes.split_vnode_bytes();
1161            rows.get_mut(&vnode)
1162                .expect("covered vnode")
1163                .insert(key, new_value.into_owned_row());
1164        }
1165        self.state_store
1166            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1167            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1168    }
1169}
1170
1171impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1172    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1173where
1174    S: StateStore,
1175    SD: ValueRowSerde,
1176{
1177    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1178    /// the table.
1179    pub fn insert(&mut self, value: impl Row) {
1180        let pk_indices = &self.pk_indices;
1181        let pk = (&value).project(pk_indices);
1182        if USE_WATERMARK_CACHE {
1183            self.watermark_cache.insert(&pk);
1184        }
1185
1186        let key_bytes = self.serialize_pk(&pk);
1187        dispatch_value_indices!(&self.value_indices, [value], {
1188            self.row_store.insert(key_bytes, value)
1189        })
1190    }
1191
1192    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1193    /// column desc of the table.
1194    pub fn delete(&mut self, old_value: impl Row) {
1195        let pk_indices = &self.pk_indices;
1196        let pk = (&old_value).project(pk_indices);
1197        if USE_WATERMARK_CACHE {
1198            self.watermark_cache.delete(&pk);
1199        }
1200
1201        let key_bytes = self.serialize_pk(&pk);
1202        dispatch_value_indices!(&self.value_indices, [old_value], {
1203            self.row_store.delete(key_bytes, old_value)
1204        })
1205    }
1206
1207    /// Update a row. The old and new value should have the same pk.
1208    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1209        let old_pk = (&old_value).project(self.pk_indices());
1210        let new_pk = (&new_value).project(self.pk_indices());
1211        debug_assert!(
1212            Row::eq(&old_pk, new_pk),
1213            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1214            self.table_id
1215        );
1216
1217        let key_bytes = self.serialize_pk(&new_pk);
1218        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1219            self.row_store.update(key_bytes, old_value, new_value)
1220        })
1221    }
1222
1223    /// Write a record into state table. Must have the same schema with the table.
1224    pub fn write_record(&mut self, record: Record<impl Row>) {
1225        match record {
1226            Record::Insert { new_row } => self.insert(new_row),
1227            Record::Delete { old_row } => self.delete(old_row),
1228            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1229        }
1230    }
1231
1232    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1233        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1234    }
1235
1236    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1237    // allow(izip, which use zip instead of zip_eq)
1238    #[allow(clippy::disallowed_methods)]
1239    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1240        let chunk = if IS_REPLICATED {
1241            self.fill_non_output_indices(chunk)
1242        } else {
1243            chunk
1244        };
1245
1246        let vnodes = self
1247            .distribution
1248            .compute_chunk_vnode(&chunk, &self.pk_indices);
1249
1250        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1251            let Some((op, row)) = optional_row else {
1252                continue;
1253            };
1254            let pk = row.project(&self.pk_indices);
1255            let vnode = vnodes[idx];
1256            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1257            match op {
1258                Op::Insert | Op::UpdateInsert => {
1259                    if USE_WATERMARK_CACHE {
1260                        self.watermark_cache.insert(&pk);
1261                    }
1262                    dispatch_value_indices!(&self.value_indices, [row], {
1263                        self.row_store.insert(key_bytes, row);
1264                    });
1265                }
1266                Op::Delete | Op::UpdateDelete => {
1267                    if USE_WATERMARK_CACHE {
1268                        self.watermark_cache.delete(&pk);
1269                    }
1270                    dispatch_value_indices!(&self.value_indices, [row], {
1271                        self.row_store.delete(key_bytes, row);
1272                    });
1273                }
1274            }
1275        }
1276    }
1277
1278    /// Update watermark for state cleaning.
1279    ///
1280    /// # Arguments
1281    ///
1282    /// * `watermark` - Latest watermark received.
1283    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1284        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1285        self.pending_watermark = Some(watermark);
1286    }
1287
1288    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1289    /// table through `update_watermark` method.
1290    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1291        self.committed_watermark.as_ref()
1292    }
1293
1294    pub async fn commit(
1295        &mut self,
1296        new_epoch: EpochPair,
1297    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1298    {
1299        self.commit_inner(new_epoch, None).await
1300    }
1301
1302    #[cfg(test)]
1303    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1304        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1305    }
1306
1307    pub async fn commit_assert_no_update_vnode_bitmap(
1308        &mut self,
1309        new_epoch: EpochPair,
1310    ) -> StreamExecutorResult<()> {
1311        let post_commit = self.commit_inner(new_epoch, None).await?;
1312        post_commit.post_yield_barrier(None).await?;
1313        Ok(())
1314    }
1315
1316    pub async fn commit_may_switch_consistent_op(
1317        &mut self,
1318        new_epoch: EpochPair,
1319        op_consistency_level: StateTableOpConsistencyLevel,
1320    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1321    {
1322        if self.op_consistency_level != op_consistency_level {
1323            // avoid flooding e2e-test log
1324            if !cfg!(debug_assertions) {
1325                info!(
1326                    ?new_epoch,
1327                    prev_op_consistency_level = ?self.op_consistency_level,
1328                    ?op_consistency_level,
1329                    table_id = self.table_id.table_id,
1330                    "switch to new op consistency level"
1331                );
1332            }
1333            self.commit_inner(new_epoch, Some(op_consistency_level))
1334                .await
1335        } else {
1336            self.commit_inner(new_epoch, None).await
1337        }
1338    }
1339
1340    async fn commit_inner(
1341        &mut self,
1342        new_epoch: EpochPair,
1343        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1344    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1345    {
1346        assert!(!self.on_post_commit);
1347        assert_eq!(
1348            self.epoch.expect("should only be called after init").curr,
1349            new_epoch.prev
1350        );
1351        if let Some(new_consistency_level) = switch_consistent_op {
1352            assert_ne!(self.op_consistency_level, new_consistency_level);
1353            self.op_consistency_level = new_consistency_level;
1354        }
1355        trace!(
1356            table_id = %self.table_id,
1357            epoch = ?self.epoch,
1358            "commit state table"
1359        );
1360
1361        let table_watermarks = self.commit_pending_watermark();
1362        self.row_store
1363            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1364            .await?;
1365        self.epoch = Some(new_epoch);
1366
1367        // Refresh watermark cache if it is out of sync.
1368        if USE_WATERMARK_CACHE
1369            && !self.watermark_cache.is_synced()
1370            && let Some(ref watermark) = self.committed_watermark
1371        {
1372            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1373                (Included(once(Some(watermark.clone()))), Unbounded);
1374            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1375            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1376            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1377            // TODO(kwannoel): We can optimize it with:
1378            // 1. Either use `RefCell`.
1379            // 2. Or pass in a direct reference to LocalStateStore,
1380            //    instead of referencing it indirectly from `self`.
1381            //    Similar to how we do for pk_indices.
1382            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1383            {
1384                let mut streams = vec![];
1385                for vnode in self.vnodes().iter_vnodes() {
1386                    let stream = self
1387                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1388                        .await?;
1389                    streams.push(Box::pin(stream));
1390                }
1391                let merged_stream = merge_sort(streams);
1392                pin_mut!(merged_stream);
1393
1394                #[for_await]
1395                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1396                    let keyed_row = entry?;
1397                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1398                    // watermark column should be part of the pk
1399                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1400                        pks.push(pk);
1401                    }
1402                }
1403            }
1404
1405            let mut filler = self.watermark_cache.begin_syncing();
1406            for pk in pks {
1407                filler.insert_unchecked(DefaultOrdered(pk), ());
1408            }
1409            filler.finish();
1410
1411            let n_cache_entries = self.watermark_cache.len();
1412            if n_cache_entries < self.watermark_cache.capacity() {
1413                self.watermark_cache.set_table_row_count(n_cache_entries);
1414            }
1415        }
1416
1417        self.on_post_commit = true;
1418        Ok(StateTablePostCommit { inner: self })
1419    }
1420
1421    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1422    fn commit_pending_watermark(
1423        &mut self,
1424    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1425        let watermark = self.pending_watermark.take()?;
1426        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1427
1428        assert!(
1429            !self.pk_indices().is_empty(),
1430            "see pending watermark on empty pk"
1431        );
1432        let watermark_serializer = {
1433            match self.clean_watermark_index_in_pk {
1434                None => self.pk_serde.index(0),
1435                Some(clean_watermark_index_in_pk) => {
1436                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1437                }
1438            }
1439        };
1440
1441        let watermark_type = match self.clean_watermark_index_in_pk {
1442            None => WatermarkSerdeType::PkPrefix,
1443            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1444                0 => WatermarkSerdeType::PkPrefix,
1445                _ => WatermarkSerdeType::NonPkPrefix,
1446            },
1447        };
1448
1449        let should_clean_watermark = {
1450            {
1451                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1452                    if let Some(key) = self.watermark_cache.lowest_key() {
1453                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1454                    } else {
1455                        // Watermark cache is synced,
1456                        // And there's no key in watermark cache.
1457                        // That implies table is empty.
1458                        // We should not clean watermark.
1459                        false
1460                    }
1461                } else {
1462                    // Either we are not using watermark cache,
1463                    // Or watermark_cache is not synced.
1464                    // In either case we should clean watermark.
1465                    true
1466                }
1467            }
1468        };
1469
1470        let watermark_suffix =
1471            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1472
1473        // Compute Delete Ranges
1474        let seal_watermark = if should_clean_watermark {
1475            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1476                self.vnodes().iter_vnodes().collect_vec()
1477            }, "delete range");
1478
1479            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1480
1481            if order_type.is_ascending() {
1482                Some((
1483                    WatermarkDirection::Ascending,
1484                    VnodeWatermark::new(
1485                        self.vnodes().clone(),
1486                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1487                    ),
1488                    watermark_type,
1489                ))
1490            } else {
1491                Some((
1492                    WatermarkDirection::Descending,
1493                    VnodeWatermark::new(
1494                        self.vnodes().clone(),
1495                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1496                    ),
1497                    watermark_type,
1498                ))
1499            }
1500        } else {
1501            None
1502        };
1503        self.committed_watermark = Some(watermark);
1504
1505        // Clear the watermark cache and force a resync.
1506        // TODO(kwannoel): This can be further optimized:
1507        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1508        //    up to the largest end of delete ranges.
1509        // 2. Mark the cache as not_synced, so we can still refill it later.
1510        // 3. When refilling the cache,
1511        //    we just refill from the largest value of the cache, as the lower bound.
1512        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1513            self.watermark_cache.clear();
1514        }
1515
1516        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1517            (direction, vec![watermark], is_non_pk_prefix)
1518        })
1519    }
1520
1521    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1522        self.row_store.try_flush().await?;
1523        Ok(())
1524    }
1525}
1526
1527// Manually expand trait alias for better IDE experience.
1528pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1529impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1530
1531pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1532impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1533
1534pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1535impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1536
1537pub trait FromVnodeBytes {
1538    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1539}
1540
1541impl FromVnodeBytes for Bytes {
1542    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1543        prefix_slice_with_vnode(vnode, bytes)
1544    }
1545}
1546
1547impl FromVnodeBytes for () {
1548    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1549}
1550
1551// Iterator functions
1552impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1553    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1554where
1555    S: StateStore,
1556    SD: ValueRowSerde,
1557{
1558    /// This function scans rows from the relational table with specific `pk_range` under the same
1559    /// `vnode`.
1560    pub async fn iter_with_vnode(
1561        &self,
1562
1563        // Optional vnode that returns an iterator only over the given range under that vnode.
1564        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1565        // iterate over each vnode that the `StateTableInner` owns.
1566        vnode: VirtualNode,
1567        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1568        prefetch_options: PrefetchOptions,
1569    ) -> StreamExecutorResult<impl RowStream<'_>> {
1570        Ok(self
1571            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1572            .await?
1573            .map_ok(|(_, row)| row))
1574    }
1575
1576    pub async fn iter_keyed_row_with_vnode(
1577        &self,
1578        vnode: VirtualNode,
1579        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1580        prefetch_options: PrefetchOptions,
1581    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1582        Ok(self
1583            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1584            .await?
1585            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1586    }
1587
1588    pub async fn iter_with_vnode_and_output_indices(
1589        &self,
1590        vnode: VirtualNode,
1591        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1592        prefetch_options: PrefetchOptions,
1593    ) -> StreamExecutorResult<impl RowStream<'_>> {
1594        assert!(IS_REPLICATED);
1595        let stream = self
1596            .iter_with_vnode(vnode, pk_range, prefetch_options)
1597            .await?;
1598        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1599    }
1600}
1601
1602impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1603    // The lowest-level API.
1604    /// Middle-level APIs:
1605    /// - [`StateTableInner::iter_with_prefix_inner`]
1606    /// - [`StateTableInner::iter_kv_with_pk_range`]
1607    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1608        &self,
1609        vnode: VirtualNode,
1610        (start, end): (Bound<Bytes>, Bound<Bytes>),
1611        prefix_hint: Option<Bytes>,
1612        prefetch_options: PrefetchOptions,
1613    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1614        if let Some(rows) = &self.all_rows {
1615            return Ok(futures::future::Either::Left(futures::stream::iter(
1616                rows.get(&vnode)
1617                    .expect("covered vnode")
1618                    .range((start, end))
1619                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1620            )));
1621        }
1622        let read_options = ReadOptions {
1623            prefix_hint,
1624            retention_seconds: self.table_option.retention_seconds,
1625            prefetch_options,
1626            cache_policy: CachePolicy::Fill(Hint::Normal),
1627        };
1628
1629        Ok(futures::future::Either::Right(
1630            deserialize_keyed_row_stream(
1631                self.state_store
1632                    .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1633                    .await?,
1634                &*self.row_serde,
1635            ),
1636        ))
1637    }
1638
1639    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1640        &self,
1641        vnode: VirtualNode,
1642        (start, end): (Bound<Bytes>, Bound<Bytes>),
1643        prefix_hint: Option<Bytes>,
1644        prefetch_options: PrefetchOptions,
1645    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1646        if let Some(rows) = &self.all_rows {
1647            return Ok(futures::future::Either::Left(futures::stream::iter(
1648                rows.get(&vnode)
1649                    .expect("covered vnode")
1650                    .range((start, end))
1651                    .rev()
1652                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1653            )));
1654        }
1655        let read_options = ReadOptions {
1656            prefix_hint,
1657            retention_seconds: self.table_option.retention_seconds,
1658            prefetch_options,
1659            cache_policy: CachePolicy::Fill(Hint::Normal),
1660        };
1661
1662        Ok(futures::future::Either::Right(
1663            deserialize_keyed_row_stream(
1664                self.state_store
1665                    .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1666                    .await?,
1667                &*self.row_serde,
1668            ),
1669        ))
1670    }
1671}
1672
1673impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1674    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1675where
1676    S: StateStore,
1677    SD: ValueRowSerde,
1678{
1679    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1680    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1681    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1682    pub async fn iter_with_prefix(
1683        &self,
1684        pk_prefix: impl Row,
1685        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1686        prefetch_options: PrefetchOptions,
1687    ) -> StreamExecutorResult<impl RowStream<'_>> {
1688        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1689            .await?;
1690        Ok(stream.map_ok(|(_, row)| row))
1691    }
1692
1693    /// Get the row from a state table with only 1 row.
1694    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1695        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1696        let stream = self
1697            .iter_with_prefix(row::empty(), sub_range, Default::default())
1698            .await?;
1699        pin_mut!(stream);
1700
1701        if let Some(res) = stream.next().await {
1702            let value = res?.into_owned_row();
1703            assert!(stream.next().await.is_none());
1704            Ok(Some(value))
1705        } else {
1706            Ok(None)
1707        }
1708    }
1709
1710    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1711    ///
1712    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1713    /// which does not matter in the use case.
1714    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1715        Ok(self
1716            .get_from_one_row_table()
1717            .await?
1718            .and_then(|row| row[0].clone()))
1719    }
1720
1721    pub async fn iter_keyed_row_with_prefix(
1722        &self,
1723        pk_prefix: impl Row,
1724        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1725        prefetch_options: PrefetchOptions,
1726    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1727        Ok(
1728            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1729                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1730        )
1731    }
1732
1733    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1734    pub async fn rev_iter_with_prefix(
1735        &self,
1736        pk_prefix: impl Row,
1737        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1738        prefetch_options: PrefetchOptions,
1739    ) -> StreamExecutorResult<impl RowStream<'_>> {
1740        Ok(
1741            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1742                .await?.map_ok(|(_, row)| row),
1743        )
1744    }
1745
1746    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1747        &self,
1748        pk_prefix: impl Row,
1749        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1750        prefetch_options: PrefetchOptions,
1751    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1752        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1753        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1754
1755        // We assume that all usages of iterating the state table only access a single vnode.
1756        // If this assertion fails, then something must be wrong with the operator implementation or
1757        // the distribution derivation from the optimizer.
1758        let vnode = self.compute_prefix_vnode(&pk_prefix);
1759
1760        // Construct prefix hint for prefix bloom filter.
1761        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1762        if self.prefix_hint_len != 0 {
1763            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1764        }
1765        let prefix_hint = {
1766            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1767                None
1768            } else {
1769                let encoded_prefix_len = self
1770                    .pk_serde
1771                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1772
1773                Some(Bytes::copy_from_slice(
1774                    &encoded_prefix[..encoded_prefix_len],
1775                ))
1776            }
1777        };
1778
1779        trace!(
1780            table_id = %self.table_id(),
1781            ?prefix_hint, ?pk_prefix,
1782            ?pk_prefix_indices,
1783            iter_direction = if REVERSE { "reverse" } else { "forward" },
1784            "storage_iter_with_prefix"
1785        );
1786
1787        let memcomparable_range =
1788            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1789
1790        Ok(if REVERSE {
1791            futures::future::Either::Left(
1792                self.row_store
1793                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1794                    .await?,
1795            )
1796        } else {
1797            futures::future::Either::Right(
1798                self.row_store
1799                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1800                    .await?,
1801            )
1802        })
1803    }
1804
1805    /// This function scans raw key-values from the relational table with specific `pk_range` under
1806    /// the same `vnode`.
1807    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1808        &'a self,
1809        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1810        // Optional vnode that returns an iterator only over the given range under that vnode.
1811        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1812        // iterate over each vnode that the `StateTableInner` owns.
1813        vnode: VirtualNode,
1814        prefetch_options: PrefetchOptions,
1815    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1816        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1817
1818        // TODO: provide a trace of useful params.
1819        self.row_store
1820            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1821            .await
1822    }
1823
1824    #[cfg(test)]
1825    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1826        &self.watermark_cache
1827    }
1828}
1829
1830fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1831    iter: impl StateStoreIter + 'a,
1832    deserializer: &'a impl ValueRowSerde,
1833) -> impl PkRowStream<'a, K> {
1834    iter.into_stream(move |(key, value)| {
1835        Ok((
1836            K::copy_from_slice(key.user_key.table_key.as_ref()),
1837            deserializer.deserialize(value).map(OwnedRow::new)?,
1838        ))
1839    })
1840    .map_err(Into::into)
1841}
1842
1843pub fn prefix_range_to_memcomparable(
1844    pk_serde: &OrderedRowSerde,
1845    range: &(Bound<impl Row>, Bound<impl Row>),
1846) -> (Bound<Bytes>, Bound<Bytes>) {
1847    (
1848        start_range_to_memcomparable(pk_serde, &range.0),
1849        end_range_to_memcomparable(pk_serde, &range.1, None),
1850    )
1851}
1852
1853fn prefix_and_sub_range_to_memcomparable(
1854    pk_serde: &OrderedRowSerde,
1855    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1856    pk_prefix: impl Row,
1857) -> (Bound<Bytes>, Bound<Bytes>) {
1858    let (range_start, range_end) = sub_range;
1859    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1860    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1861    let start_range = match range_start {
1862        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1863        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1864        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1865    };
1866    let end_range = match range_end {
1867        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1868        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1869        Unbounded => Unbounded,
1870    };
1871    (
1872        start_range_to_memcomparable(pk_serde, &start_range),
1873        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1874    )
1875}
1876
1877fn start_range_to_memcomparable<R: Row>(
1878    pk_serde: &OrderedRowSerde,
1879    bound: &Bound<R>,
1880) -> Bound<Bytes> {
1881    let serialize_pk_prefix = |pk_prefix: &R| {
1882        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1883        serialize_pk(pk_prefix, &prefix_serializer)
1884    };
1885    match bound {
1886        Unbounded => Unbounded,
1887        Included(r) => {
1888            let serialized = serialize_pk_prefix(r);
1889
1890            Included(serialized)
1891        }
1892        Excluded(r) => {
1893            let serialized = serialize_pk_prefix(r);
1894
1895            start_bound_of_excluded_prefix(&serialized)
1896        }
1897    }
1898}
1899
1900fn end_range_to_memcomparable<R: Row>(
1901    pk_serde: &OrderedRowSerde,
1902    bound: &Bound<R>,
1903    serialized_pk_prefix: Option<Bytes>,
1904) -> Bound<Bytes> {
1905    let serialize_pk_prefix = |pk_prefix: &R| {
1906        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1907        serialize_pk(pk_prefix, &prefix_serializer)
1908    };
1909    match bound {
1910        Unbounded => match serialized_pk_prefix {
1911            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1912            None => Unbounded,
1913        },
1914        Included(r) => {
1915            let serialized = serialize_pk_prefix(r);
1916
1917            end_bound_of_prefix(&serialized)
1918        }
1919        Excluded(r) => {
1920            let serialized = serialize_pk_prefix(r);
1921            Excluded(serialized)
1922        }
1923    }
1924}
1925
1926fn fill_non_output_indices(
1927    i2o_mapping: &ColIndexMapping,
1928    data_types: &[DataType],
1929    chunk: StreamChunk,
1930) -> StreamChunk {
1931    let cardinality = chunk.cardinality();
1932    let (ops, columns, vis) = chunk.into_inner();
1933    let mut full_columns = Vec::with_capacity(data_types.len());
1934    for (i, data_type) in data_types.iter().enumerate() {
1935        if let Some(j) = i2o_mapping.try_map(i) {
1936            full_columns.push(columns[j].clone());
1937        } else {
1938            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1939            column_builder.append_n_null(cardinality);
1940            let column: ArrayRef = column_builder.finish().into();
1941            full_columns.push(column)
1942        }
1943    }
1944    let data_chunk = DataChunk::new(full_columns, vis);
1945    StreamChunk::from_parts(ops, data_chunk)
1946}
1947
1948#[cfg(test)]
1949mod tests {
1950    use std::fmt::Debug;
1951
1952    use expect_test::{Expect, expect};
1953
1954    use super::*;
1955
1956    fn check(actual: impl Debug, expect: Expect) {
1957        let actual = format!("{:#?}", actual);
1958        expect.assert_eq(&actual);
1959    }
1960
1961    #[test]
1962    fn test_fill_non_output_indices() {
1963        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1964        let replicated_chunk = [OwnedRow::new(vec![
1965            Some(222_i32.into()),
1966            Some(2_i32.into()),
1967        ])];
1968        let replicated_chunk = StreamChunk::from_parts(
1969            vec![Op::Insert],
1970            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1971        );
1972        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1973        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1974        check(
1975            filled_chunk,
1976            expect![[r#"
1977            StreamChunk { cardinality: 1, capacity: 1, data:
1978            +---+---+---+-----+
1979            | + | 2 |   | 222 |
1980            +---+---+---+-----+
1981             }"#]],
1982        );
1983    }
1984}