risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73/// Mostly watermark operators will have inserts (append-only).
74/// So this number should not need to be very large.
75/// But we may want to improve this choice in the future.
76const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78/// This macro is used to mark a point where we want to randomly discard the operation and early
79/// return, only in insane mode.
80macro_rules! insane_mode_discard_point {
81    () => {{
82        use rand::Rng;
83        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84            return;
85        }
86    }};
87}
88
89/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
90/// row-based encoding.
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    row_store: StateTableRowStore<S::Local, SD>,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Indices of primary key.
116    /// Note that the index is based on the all columns of the table, instead of the output ones.
117    // FIXME: revisit constructions and usages.
118    pk_indices: Vec<usize>,
119
120    /// Distribution of the state table.
121    ///
122    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
123    /// executor. The table will also check whether the written rows
124    /// conform to this partition.
125    distribution: TableDistribution,
126
127    prefix_hint_len: usize,
128
129    value_indices: Option<Vec<usize>>,
130
131    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
132    pending_watermark: Option<ScalarImpl>,
133    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
134    committed_watermark: Option<ScalarImpl>,
135    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
136    watermark_cache: StateTableWatermarkCache,
137
138    /// Data Types
139    /// We will need to use to build data chunks from state table rows.
140    data_types: Vec<DataType>,
141
142    /// "i" here refers to the base `state_table`'s actual schema.
143    /// "o" here refers to the replicated state table's output schema.
144    /// This mapping is used to reconstruct a row being written from replicated state table.
145    /// Such that the schema of this row will match the full schema of the base state table.
146    /// It is only applicable for replication.
147    i2o_mapping: ColIndexMapping,
148
149    /// Output indices
150    /// Used for:
151    /// 1. Computing `output_value_indices` to ser/de replicated rows.
152    /// 2. Computing output pk indices to used them for backfill state.
153    pub output_indices: Vec<usize>,
154
155    op_consistency_level: StateTableOpConsistencyLevel,
156
157    clean_watermark_index_in_pk: Option<i32>,
158
159    /// Flag to indicate whether the state table has called `commit`, but has not called
160    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
161    on_post_commit: bool,
162}
163
164/// `StateTable` will use `BasicSerde` as default
165pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
167/// Used for `ArrangementBackfill` executor.
168pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169/// `WatermarkCacheStateTable` caches the watermark column.
170/// It will reduce state cleaning overhead.
171pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175// initialize
176impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179    S: StateStore,
180    SD: ValueRowSerde,
181{
182    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
183    /// and otherwise, deadlock can be likely to happen.
184    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185        self.row_store
186            .init(epoch, self.distribution.vnodes())
187            .await?;
188        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189        Ok(())
190    }
191
192    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193        self.store
194            .try_wait_epoch(
195                HummockReadEpoch::Committed(prev_epoch),
196                TryWaitEpochOptions {
197                    table_id: self.table_id,
198                },
199            )
200            .await
201    }
202
203    pub fn state_store(&self) -> &S {
204        &self.store
205    }
206}
207
208fn consistent_old_value_op(
209    row_serde: Arc<impl ValueRowSerde>,
210    is_log_store: bool,
211) -> OpConsistencyLevel {
212    OpConsistencyLevel::ConsistentOldValue {
213        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214            if first == second {
215                return true;
216            }
217            let first = match row_serde.deserialize(first) {
218                Ok(rows) => rows,
219                Err(e) => {
220                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221                    return false;
222                }
223            };
224            let second = match row_serde.deserialize(second) {
225                Ok(rows) => rows,
226                Err(e) => {
227                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228                    return false;
229                }
230            };
231            if first != second {
232                error!(first = ?first, second = ?second, "sanity check fail");
233                false
234            } else {
235                true
236            }
237        }),
238        is_log_store,
239    }
240}
241
242macro_rules! dispatch_value_indices {
243    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244        if let Some(value_indices) = $value_indices {
245            $(
246                let $row_var_name = $row_var_name.project(value_indices);
247            )+
248            $body
249        } else {
250            $body
251        }
252    };
253}
254
255/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
256/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
257/// provides method to read (`get` and `iter`) over serialized key (or key range)
258/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
259struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260    state_store: LS,
261    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263    table_id: TableId,
264    table_option: TableOption,
265    row_serde: Arc<SD>,
266    // should be only used for debugging in panic message of handle_mem_table_error
267    pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272        if let Some(rows) = &mut self.all_rows {
273            rows.clear();
274            let start_time = Instant::now();
275            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276                let state_store = &self.state_store;
277                let retention_seconds = self.table_option.retention_seconds;
278                let row_serde = &self.row_serde;
279                async move {
280                    let mut rows = BTreeMap::new();
281                    let memcomparable_range_with_vnode =
282                        prefixed_range_with_vnode::<Bytes>(.., vnode);
283                    // TODO: set read options
284                    let stream = deserialize_keyed_row_stream::<Bytes>(
285                        state_store
286                            .iter(
287                                memcomparable_range_with_vnode,
288                                ReadOptions {
289                                    prefix_hint: None,
290                                    prefetch_options: Default::default(),
291                                    cache_policy: Default::default(),
292                                    retention_seconds,
293                                },
294                            )
295                            .await?,
296                        &**row_serde,
297                    );
298                    pin_mut!(stream);
299                    while let Some((encoded_key, row)) = stream.try_next().await? {
300                        let key = TableKey(encoded_key);
301                        let (iter_vnode, key) = key.split_vnode_bytes();
302                        assert_eq!(vnode, iter_vnode);
303                        rows.try_insert(key, row).expect("non-duplicated");
304                    }
305                    Ok((vnode, rows)) as StreamExecutorResult<_>
306                }
307            }))
308            .await?
309            .into_iter()
310            .collect();
311            // avoid flooding e2e-test log
312            if !cfg!(debug_assertions) {
313                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314            }
315        }
316        Ok(())
317    }
318
319    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320        self.state_store.init(InitOptions::new(epoch)).await?;
321        self.may_reload_all_rows(vnode_bitmap).await
322    }
323
324    async fn update_vnode_bitmap(
325        &mut self,
326        vnodes: Arc<Bitmap>,
327    ) -> StreamExecutorResult<Arc<Bitmap>> {
328        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329        self.may_reload_all_rows(&vnodes).await?;
330        Ok(prev_vnodes)
331    }
332
333    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334        self.state_store.try_flush().await?;
335        Ok(())
336    }
337
338    async fn seal_current_epoch(
339        &mut self,
340        next_epoch: u64,
341        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343    ) -> StreamExecutorResult<()> {
344        if let Some((direction, watermarks, serde_type)) = &table_watermarks
345            && let Some(rows) = &mut self.all_rows
346        {
347            match serde_type {
348                WatermarkSerdeType::PkPrefix => {
349                    for vnode_watermark in watermarks {
350                        match direction {
351                            WatermarkDirection::Ascending => {
352                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
354                                    // split_off returns everything after the given key, including the key.
355                                    *rows = rows.split_off(vnode_watermark.watermark());
356                                }
357                            }
358                            WatermarkDirection::Descending => {
359                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
360                                let split_off_key = next_key(vnode_watermark.watermark());
361                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
363                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
364                                    // (..Include(vnode_watermark.watermark()))
365                                    rows.split_off(split_off_key.as_slice());
366                                }
367                            }
368                        }
369                    }
370                }
371                WatermarkSerdeType::NonPkPrefix => {
372                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373                    self.all_rows = None;
374                }
375            }
376        }
377        self.state_store
378            .flush()
379            .instrument(tracing::info_span!("state_table_flush"))
380            .await?;
381        let switch_op_consistency_level =
382            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384                StateTableOpConsistencyLevel::ConsistentOldValue => {
385                    consistent_old_value_op(self.row_serde.clone(), false)
386                }
387                StateTableOpConsistencyLevel::LogStoreEnabled => {
388                    consistent_old_value_op(self.row_serde.clone(), true)
389                }
390            });
391        self.state_store.seal_current_epoch(
392            next_epoch,
393            SealCurrentEpochOptions {
394                table_watermarks,
395                switch_op_consistency_level,
396            },
397        );
398        Ok(())
399    }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404    /// Op is inconsistent
405    Inconsistent,
406    /// Op is consistent.
407    /// - Insert op should ensure that the key does not exist previously
408    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
409    ConsistentOldValue,
410    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
411    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
412    LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416    'a,
417    S,
418    SD,
419    const IS_REPLICATED: bool,
420    const USE_WATERMARK_CACHE: bool,
421    PreloadAllRow,
422> {
423    table_catalog: &'a Table,
424    store: S,
425    vnodes: Option<Arc<Bitmap>>,
426    op_consistency_level: Option<StateTableOpConsistencyLevel>,
427    output_column_ids: Option<Vec<ColumnId>>,
428    preload_all_rows: PreloadAllRow,
429
430    _serde: PhantomData<SD>,
431}
432
433impl<
434    'a,
435    S: StateStore,
436    SD: ValueRowSerde,
437    const IS_REPLICATED: bool,
438    const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442        Self {
443            table_catalog,
444            store,
445            vnodes,
446            op_consistency_level: None,
447            output_column_ids: None,
448            preload_all_rows: (),
449            _serde: Default::default(),
450        }
451    }
452
453    fn with_preload_all_rows(
454        self,
455        preload_all_rows: bool,
456    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457        StateTableBuilder {
458            table_catalog: self.table_catalog,
459            store: self.store,
460            vnodes: self.vnodes,
461            op_consistency_level: self.op_consistency_level,
462            output_column_ids: self.output_column_ids,
463            preload_all_rows,
464            _serde: Default::default(),
465        }
466    }
467
468    pub fn enable_preload_all_rows_by_config(
469        self,
470        config: &StreamingConfig,
471    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472        let developer = &config.developer;
473        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474            !developer
475                .mem_preload_state_table_ids_blacklist
476                .contains(&self.table_catalog.id)
477        } else {
478            developer
479                .mem_preload_state_table_ids_whitelist
480                .contains(&self.table_catalog.id)
481        };
482        self.with_preload_all_rows(preload_all_rows)
483    }
484
485    pub fn forbid_preload_all_rows(
486        self,
487    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488        self.with_preload_all_rows(false)
489    }
490}
491
492impl<
493    'a,
494    S: StateStore,
495    SD: ValueRowSerde,
496    const IS_REPLICATED: bool,
497    const USE_WATERMARK_CACHE: bool,
498    PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501    pub fn with_op_consistency_level(
502        mut self,
503        op_consistency_level: StateTableOpConsistencyLevel,
504    ) -> Self {
505        self.op_consistency_level = Some(op_consistency_level);
506        self
507    }
508
509    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510        self.output_column_ids = Some(output_column_ids);
511        self
512    }
513}
514
515impl<
516    'a,
517    S: StateStore,
518    SD: ValueRowSerde,
519    const IS_REPLICATED: bool,
520    const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524        let mut preload_all_rows = self.preload_all_rows;
525        if preload_all_rows
526            && let Err(e) =
527                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528        {
529            warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530            preload_all_rows = false;
531        }
532        StateTableInner::from_table_catalog_inner(
533            self.table_catalog,
534            self.store,
535            self.vnodes,
536            self.op_consistency_level
537                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538            self.output_column_ids.unwrap_or_default(),
539            preload_all_rows,
540        )
541        .await
542    }
543}
544
545// initialize
546// FIXME(kwannoel): Enforce that none of the constructors here
547// should be used by replicated state table.
548// Apart from from_table_catalog_inner.
549impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552    S: StateStore,
553    SD: ValueRowSerde,
554{
555    /// Create state table from table catalog and store.
556    ///
557    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
558    #[cfg(any(test, feature = "test"))]
559    pub async fn from_table_catalog(
560        table_catalog: &Table,
561        store: S,
562        vnodes: Option<Arc<Bitmap>>,
563    ) -> Self {
564        StateTableBuilder::new(table_catalog, store, vnodes)
565            .forbid_preload_all_rows()
566            .build()
567            .await
568    }
569
570    /// Create state table from table catalog and store with sanity check disabled.
571    pub async fn from_table_catalog_inconsistent_op(
572        table_catalog: &Table,
573        store: S,
574        vnodes: Option<Arc<Bitmap>>,
575    ) -> Self {
576        StateTableBuilder::new(table_catalog, store, vnodes)
577            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
578            .forbid_preload_all_rows()
579            .build()
580            .await
581    }
582
583    /// Create state table from table catalog and store.
584    async fn from_table_catalog_inner(
585        table_catalog: &Table,
586        store: S,
587        vnodes: Option<Arc<Bitmap>>,
588        op_consistency_level: StateTableOpConsistencyLevel,
589        output_column_ids: Vec<ColumnId>,
590        preload_all_rows: bool,
591    ) -> Self {
592        let table_id = TableId::new(table_catalog.id);
593        let table_columns: Vec<ColumnDesc> = table_catalog
594            .columns
595            .iter()
596            .map(|col| col.column_desc.as_ref().unwrap().into())
597            .collect();
598        let data_types: Vec<DataType> = table_catalog
599            .columns
600            .iter()
601            .map(|col| {
602                col.get_column_desc()
603                    .unwrap()
604                    .get_column_type()
605                    .unwrap()
606                    .into()
607            })
608            .collect();
609        let order_types: Vec<OrderType> = table_catalog
610            .pk
611            .iter()
612            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
613            .collect();
614        let dist_key_indices: Vec<usize> = table_catalog
615            .distribution_key
616            .iter()
617            .map(|dist_index| *dist_index as usize)
618            .collect();
619
620        let pk_indices = table_catalog
621            .pk
622            .iter()
623            .map(|col_order| col_order.column_index as usize)
624            .collect_vec();
625
626        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
627        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
628            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
629        } else {
630            table_catalog
631                .get_dist_key_in_pk()
632                .iter()
633                .map(|idx| *idx as usize)
634                .collect()
635        };
636
637        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
638            let vnode_col_idx = *idx as usize;
639            pk_indices.iter().position(|&i| vnode_col_idx == i)
640        });
641
642        let distribution =
643            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
644        assert_eq!(
645            distribution.vnode_count(),
646            table_catalog.vnode_count(),
647            "vnode count mismatch, scanning table {} under wrong distribution?",
648            table_catalog.name,
649        );
650
651        let pk_data_types = pk_indices
652            .iter()
653            .map(|i| table_columns[*i].data_type.clone())
654            .collect();
655        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
656
657        let input_value_indices = table_catalog
658            .value_indices
659            .iter()
660            .map(|val| *val as usize)
661            .collect_vec();
662
663        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
664
665        // if value_indices is the no shuffle full columns.
666        let value_indices = match input_value_indices.len() == table_columns.len()
667            && input_value_indices == no_shuffle_value_indices
668        {
669            true => None,
670            false => Some(input_value_indices),
671        };
672        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
673
674        let row_serde = Arc::new(SD::new(
675            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
676            Arc::from(table_columns.clone().into_boxed_slice()),
677        ));
678
679        let state_table_op_consistency_level = op_consistency_level;
680        let op_consistency_level = match op_consistency_level {
681            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
682            StateTableOpConsistencyLevel::ConsistentOldValue => {
683                consistent_old_value_op(row_serde.clone(), false)
684            }
685            StateTableOpConsistencyLevel::LogStoreEnabled => {
686                consistent_old_value_op(row_serde.clone(), true)
687            }
688        };
689
690        let table_option = TableOption::new(table_catalog.retention_seconds);
691        let new_local_options = if IS_REPLICATED {
692            NewLocalOptions::new_replicated(
693                table_id,
694                op_consistency_level,
695                table_option,
696                distribution.vnodes().clone(),
697            )
698        } else {
699            NewLocalOptions::new(
700                table_id,
701                op_consistency_level,
702                table_option,
703                distribution.vnodes().clone(),
704                true,
705            )
706        };
707        let local_state_store = store.new_local(new_local_options).await;
708
709        // If state table has versioning, that means it supports
710        // Schema change. In that case, the row encoding should be column aware as well.
711        // Otherwise both will be false.
712        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
713        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
714        assert_eq!(
715            table_catalog.version.is_some(),
716            row_serde.kind().is_column_aware()
717        );
718
719        // Restore persisted table watermark.
720        let watermark_serde = if pk_indices.is_empty() {
721            None
722        } else {
723            match table_catalog.clean_watermark_index_in_pk {
724                None => Some(pk_serde.index(0)),
725                Some(clean_watermark_index_in_pk) => {
726                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
727                }
728            }
729        };
730        let max_watermark_of_vnodes = distribution
731            .vnodes()
732            .iter_vnodes()
733            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
734            .max();
735        let committed_watermark = if let Some(deser) = watermark_serde
736            && let Some(max_watermark) = max_watermark_of_vnodes
737        {
738            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
739                assert!(row.len() == 1);
740                row[0].clone()
741            });
742            if deserialized.is_none() {
743                tracing::error!(
744                    vnodes = ?distribution.vnodes(),
745                    watermark = ?max_watermark,
746                    "Failed to deserialize persisted watermark from state store.",
747                );
748            }
749            deserialized
750        } else {
751            None
752        };
753
754        let watermark_cache = if USE_WATERMARK_CACHE {
755            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
756        } else {
757            StateTableWatermarkCache::new(0)
758        };
759
760        // Get info for replicated state table.
761        let output_column_ids_to_input_idx = output_column_ids
762            .iter()
763            .enumerate()
764            .map(|(pos, id)| (*id, pos))
765            .collect::<HashMap<_, _>>();
766
767        // Compute column descriptions
768        let columns: Vec<ColumnDesc> = table_catalog
769            .columns
770            .iter()
771            .map(|c| c.column_desc.as_ref().unwrap().into())
772            .collect_vec();
773
774        // Compute i2o mapping
775        // Note that this can be a partial mapping, since we use the i2o mapping to get
776        // any 1 of the output columns, and use that to fill the input column.
777        let mut i2o_mapping = vec![None; columns.len()];
778        for (i, column) in columns.iter().enumerate() {
779            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
780                i2o_mapping[i] = Some(*pos);
781            }
782        }
783        // We can prune any duplicate column indices
784        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
785
786        // Compute output indices
787        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
788
789        Self {
790            table_id,
791            row_store: StateTableRowStore {
792                all_rows: preload_all_rows.then(HashMap::new),
793                table_option,
794                state_store: local_state_store,
795                row_serde,
796                pk_serde: pk_serde.clone(),
797                table_id,
798            },
799            store,
800            epoch: None,
801            pk_serde,
802            pk_indices,
803            distribution,
804            prefix_hint_len,
805            value_indices,
806            pending_watermark: None,
807            committed_watermark,
808            watermark_cache,
809            data_types,
810            output_indices,
811            i2o_mapping,
812            op_consistency_level: state_table_op_consistency_level,
813            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
814            on_post_commit: false,
815        }
816    }
817
818    pub fn get_data_types(&self) -> &[DataType] {
819        &self.data_types
820    }
821
822    pub fn table_id(&self) -> TableId {
823        self.table_id
824    }
825
826    /// Get the vnode value with given (prefix of) primary key
827    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
828        self.distribution
829            .try_compute_vnode_by_pk_prefix(pk_prefix)
830            .expect("For streaming, the given prefix must be enough to calculate the vnode")
831    }
832
833    /// Get the vnode value of the given primary key
834    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
835        self.distribution.compute_vnode_by_pk(pk)
836    }
837
838    /// NOTE(kwannoel): This is used by backfill.
839    /// We want to check pk indices of upstream table.
840    pub fn pk_indices(&self) -> &[usize] {
841        &self.pk_indices
842    }
843
844    /// Get the indices of the primary key columns in the output columns.
845    ///
846    /// Returns `None` if any of the primary key columns is not in the output columns.
847    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
848        assert!(IS_REPLICATED);
849        self.pk_indices
850            .iter()
851            .map(|&i| self.output_indices.iter().position(|&j| i == j))
852            .collect()
853    }
854
855    pub fn pk_serde(&self) -> &OrderedRowSerde {
856        &self.pk_serde
857    }
858
859    pub fn vnodes(&self) -> &Arc<Bitmap> {
860        self.distribution.vnodes()
861    }
862
863    pub fn value_indices(&self) -> &Option<Vec<usize>> {
864        &self.value_indices
865    }
866
867    pub fn is_consistent_op(&self) -> bool {
868        matches!(
869            self.op_consistency_level,
870            StateTableOpConsistencyLevel::ConsistentOldValue
871                | StateTableOpConsistencyLevel::LogStoreEnabled
872        )
873    }
874}
875
876impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
877where
878    S: StateStore,
879    SD: ValueRowSerde,
880{
881    /// Create replicated state table from table catalog with output indices
882    pub async fn new_replicated(
883        table_catalog: &Table,
884        store: S,
885        vnodes: Option<Arc<Bitmap>>,
886        output_column_ids: Vec<ColumnId>,
887    ) -> Self {
888        // TODO: can it be ConsistentOldValue?
889        // TODO: may enable preload_all_rows
890        StateTableBuilder::new(table_catalog, store, vnodes)
891            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
892            .with_output_column_ids(output_column_ids)
893            .forbid_preload_all_rows()
894            .build()
895            .await
896    }
897}
898
899// point get
900impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
901    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
902where
903    S: StateStore,
904    SD: ValueRowSerde,
905{
906    /// Get a single row from state table.
907    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
908        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
909        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
910        match row {
911            Some(row) => {
912                if IS_REPLICATED {
913                    // If the table is replicated, we need to deserialize the row with the output
914                    // indices.
915                    let row = row.project(&self.output_indices);
916                    Ok(Some(row.into_owned_row()))
917                } else {
918                    Ok(Some(row))
919                }
920            }
921            None => Ok(None),
922        }
923    }
924
925    /// Get a raw encoded row from state table.
926    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
927        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
928        self.row_store.exists(serialized_pk, prefix_hint).await
929    }
930
931    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
932        assert!(pk.len() <= self.pk_indices.len());
933        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
934    }
935
936    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
937        let serialized_pk = self.serialize_pk(&pk);
938        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
939            Some(serialized_pk.slice(VirtualNode::SIZE..))
940        } else {
941            #[cfg(debug_assertions)]
942            if self.prefix_hint_len != 0 {
943                warn!(
944                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
945                );
946            }
947            None
948        };
949        (serialized_pk, prefix_hint)
950    }
951}
952
953impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
954    async fn get(
955        &self,
956        key_bytes: TableKey<Bytes>,
957        prefix_hint: Option<Bytes>,
958    ) -> StreamExecutorResult<Option<OwnedRow>> {
959        if let Some(rows) = &self.all_rows {
960            let (vnode, key) = key_bytes.split_vnode();
961            return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
962        }
963        let read_options = ReadOptions {
964            prefix_hint,
965            retention_seconds: self.table_option.retention_seconds,
966            cache_policy: CachePolicy::Fill(Hint::Normal),
967            ..Default::default()
968        };
969
970        self.state_store
971            .on_key_value(key_bytes, read_options, move |_, value| {
972                let row = self.row_serde.deserialize(value)?;
973                Ok(OwnedRow::new(row))
974            })
975            .await
976            .map_err(Into::into)
977    }
978
979    async fn exists(
980        &self,
981        key_bytes: TableKey<Bytes>,
982        prefix_hint: Option<Bytes>,
983    ) -> StreamExecutorResult<bool> {
984        if let Some(rows) = &self.all_rows {
985            let (vnode, key) = key_bytes.split_vnode();
986            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
987        }
988        let read_options = ReadOptions {
989            prefix_hint,
990            retention_seconds: self.table_option.retention_seconds,
991            cache_policy: CachePolicy::Fill(Hint::Normal),
992            ..Default::default()
993        };
994        let result = self
995            .state_store
996            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
997            .await?;
998        Ok(result.is_some())
999    }
1000}
1001
1002/// A callback struct returned from [`StateTableInner::commit`].
1003///
1004/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1005/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1006/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1007///
1008/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1009/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1010/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1011/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1012/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1013/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1014/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1015/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1016#[must_use]
1017pub struct StateTablePostCommit<
1018    'a,
1019    S,
1020    SD = BasicSerde,
1021    const IS_REPLICATED: bool = false,
1022    const USE_WATERMARK_CACHE: bool = false,
1023> where
1024    S: StateStore,
1025    SD: ValueRowSerde,
1026{
1027    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1028}
1029
1030impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1031    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1032where
1033    S: StateStore,
1034    SD: ValueRowSerde,
1035{
1036    pub async fn post_yield_barrier(
1037        mut self,
1038        new_vnodes: Option<Arc<Bitmap>>,
1039    ) -> StreamExecutorResult<
1040        Option<(
1041            (
1042                Arc<Bitmap>,
1043                Arc<Bitmap>,
1044                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1045            ),
1046            bool,
1047        )>,
1048    > {
1049        self.inner.on_post_commit = false;
1050        Ok(if let Some(new_vnodes) = new_vnodes {
1051            let (old_vnodes, cache_may_stale) =
1052                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1053            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1054        } else {
1055            None
1056        })
1057    }
1058
1059    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1060        &*self.inner
1061    }
1062
1063    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1064    async fn update_vnode_bitmap(
1065        &mut self,
1066        new_vnodes: Arc<Bitmap>,
1067    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1068        let prev_vnodes = self
1069            .inner
1070            .row_store
1071            .update_vnode_bitmap(new_vnodes.clone())
1072            .await?;
1073        assert_eq!(
1074            &prev_vnodes,
1075            self.inner.vnodes(),
1076            "state table and state store vnode bitmap mismatches"
1077        );
1078
1079        if self.inner.distribution.is_singleton() {
1080            assert_eq!(
1081                &new_vnodes,
1082                self.inner.vnodes(),
1083                "should not update vnode bitmap for singleton table"
1084            );
1085        }
1086        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1087
1088        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1089
1090        if cache_may_stale {
1091            self.inner.pending_watermark = None;
1092            if USE_WATERMARK_CACHE {
1093                self.inner.watermark_cache.clear();
1094            }
1095        }
1096
1097        Ok((
1098            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1099            cache_may_stale,
1100        ))
1101    }
1102}
1103
1104// write
1105impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1106    fn handle_mem_table_error(&self, e: StorageError) {
1107        let e = match e.into_inner() {
1108            ErrorKind::MemTable(e) => e,
1109            _ => unreachable!("should only get memtable error"),
1110        };
1111        match *e {
1112            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1113                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1114                panic!(
1115                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1116                    self.table_id,
1117                    vnode,
1118                    &key,
1119                    prev.debug_fmt(&*self.row_serde),
1120                    new.debug_fmt(&*self.row_serde),
1121                )
1122            }
1123        }
1124    }
1125
1126    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1127        insane_mode_discard_point!();
1128        let value_bytes = self.row_serde.serialize(&value).into();
1129        if let Some(rows) = &mut self.all_rows {
1130            let (vnode, key) = key.split_vnode_bytes();
1131            rows.get_mut(&vnode)
1132                .expect("covered vnode")
1133                .insert(key, value.into_owned_row());
1134        }
1135        self.state_store
1136            .insert(key, value_bytes, None)
1137            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1138    }
1139
1140    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1141        insane_mode_discard_point!();
1142        let value_bytes = self.row_serde.serialize(value).into();
1143        if let Some(rows) = &mut self.all_rows {
1144            let (vnode, key) = key.split_vnode();
1145            rows.get_mut(&vnode).expect("covered vnode").remove(key);
1146        }
1147        self.state_store
1148            .delete(key, value_bytes)
1149            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1150    }
1151
1152    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1153        insane_mode_discard_point!();
1154        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1155        let old_value_bytes = self.row_serde.serialize(old_value).into();
1156        if let Some(rows) = &mut self.all_rows {
1157            let (vnode, key) = key_bytes.split_vnode_bytes();
1158            rows.get_mut(&vnode)
1159                .expect("covered vnode")
1160                .insert(key, new_value.into_owned_row());
1161        }
1162        self.state_store
1163            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1164            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1165    }
1166}
1167
1168impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1169    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1170where
1171    S: StateStore,
1172    SD: ValueRowSerde,
1173{
1174    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1175    /// the table.
1176    pub fn insert(&mut self, value: impl Row) {
1177        let pk_indices = &self.pk_indices;
1178        let pk = (&value).project(pk_indices);
1179        if USE_WATERMARK_CACHE {
1180            self.watermark_cache.insert(&pk);
1181        }
1182
1183        let key_bytes = self.serialize_pk(&pk);
1184        dispatch_value_indices!(&self.value_indices, [value], {
1185            self.row_store.insert(key_bytes, value)
1186        })
1187    }
1188
1189    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1190    /// column desc of the table.
1191    pub fn delete(&mut self, old_value: impl Row) {
1192        let pk_indices = &self.pk_indices;
1193        let pk = (&old_value).project(pk_indices);
1194        if USE_WATERMARK_CACHE {
1195            self.watermark_cache.delete(&pk);
1196        }
1197
1198        let key_bytes = self.serialize_pk(&pk);
1199        dispatch_value_indices!(&self.value_indices, [old_value], {
1200            self.row_store.delete(key_bytes, old_value)
1201        })
1202    }
1203
1204    /// Update a row. The old and new value should have the same pk.
1205    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1206        let old_pk = (&old_value).project(self.pk_indices());
1207        let new_pk = (&new_value).project(self.pk_indices());
1208        debug_assert!(
1209            Row::eq(&old_pk, new_pk),
1210            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1211            self.table_id
1212        );
1213
1214        let key_bytes = self.serialize_pk(&new_pk);
1215        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1216            self.row_store.update(key_bytes, old_value, new_value)
1217        })
1218    }
1219
1220    /// Write a record into state table. Must have the same schema with the table.
1221    pub fn write_record(&mut self, record: Record<impl Row>) {
1222        match record {
1223            Record::Insert { new_row } => self.insert(new_row),
1224            Record::Delete { old_row } => self.delete(old_row),
1225            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1226        }
1227    }
1228
1229    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1230        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1231    }
1232
1233    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1234    // allow(izip, which use zip instead of zip_eq)
1235    #[allow(clippy::disallowed_methods)]
1236    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1237        let chunk = if IS_REPLICATED {
1238            self.fill_non_output_indices(chunk)
1239        } else {
1240            chunk
1241        };
1242
1243        let vnodes = self
1244            .distribution
1245            .compute_chunk_vnode(&chunk, &self.pk_indices);
1246
1247        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1248            let Some((op, row)) = optional_row else {
1249                continue;
1250            };
1251            let pk = row.project(&self.pk_indices);
1252            let vnode = vnodes[idx];
1253            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1254            match op {
1255                Op::Insert | Op::UpdateInsert => {
1256                    if USE_WATERMARK_CACHE {
1257                        self.watermark_cache.insert(&pk);
1258                    }
1259                    dispatch_value_indices!(&self.value_indices, [row], {
1260                        self.row_store.insert(key_bytes, row);
1261                    });
1262                }
1263                Op::Delete | Op::UpdateDelete => {
1264                    if USE_WATERMARK_CACHE {
1265                        self.watermark_cache.delete(&pk);
1266                    }
1267                    dispatch_value_indices!(&self.value_indices, [row], {
1268                        self.row_store.delete(key_bytes, row);
1269                    });
1270                }
1271            }
1272        }
1273    }
1274
1275    /// Update watermark for state cleaning.
1276    ///
1277    /// # Arguments
1278    ///
1279    /// * `watermark` - Latest watermark received.
1280    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1281        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1282        self.pending_watermark = Some(watermark);
1283    }
1284
1285    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1286    /// table through `update_watermark` method.
1287    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1288        self.committed_watermark.as_ref()
1289    }
1290
1291    pub async fn commit(
1292        &mut self,
1293        new_epoch: EpochPair,
1294    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1295    {
1296        self.commit_inner(new_epoch, None).await
1297    }
1298
1299    #[cfg(test)]
1300    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1301        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1302    }
1303
1304    pub async fn commit_assert_no_update_vnode_bitmap(
1305        &mut self,
1306        new_epoch: EpochPair,
1307    ) -> StreamExecutorResult<()> {
1308        let post_commit = self.commit_inner(new_epoch, None).await?;
1309        post_commit.post_yield_barrier(None).await?;
1310        Ok(())
1311    }
1312
1313    pub async fn commit_may_switch_consistent_op(
1314        &mut self,
1315        new_epoch: EpochPair,
1316        op_consistency_level: StateTableOpConsistencyLevel,
1317    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1318    {
1319        if self.op_consistency_level != op_consistency_level {
1320            // avoid flooding e2e-test log
1321            if !cfg!(debug_assertions) {
1322                info!(
1323                    ?new_epoch,
1324                    prev_op_consistency_level = ?self.op_consistency_level,
1325                    ?op_consistency_level,
1326                    table_id = %self.table_id,
1327                    "switch to new op consistency level"
1328                );
1329            }
1330            self.commit_inner(new_epoch, Some(op_consistency_level))
1331                .await
1332        } else {
1333            self.commit_inner(new_epoch, None).await
1334        }
1335    }
1336
1337    async fn commit_inner(
1338        &mut self,
1339        new_epoch: EpochPair,
1340        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1341    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1342    {
1343        assert!(!self.on_post_commit);
1344        assert_eq!(
1345            self.epoch.expect("should only be called after init").curr,
1346            new_epoch.prev
1347        );
1348        if let Some(new_consistency_level) = switch_consistent_op {
1349            assert_ne!(self.op_consistency_level, new_consistency_level);
1350            self.op_consistency_level = new_consistency_level;
1351        }
1352        trace!(
1353            table_id = %self.table_id,
1354            epoch = ?self.epoch,
1355            "commit state table"
1356        );
1357
1358        let table_watermarks = self.commit_pending_watermark();
1359        self.row_store
1360            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1361            .await?;
1362        self.epoch = Some(new_epoch);
1363
1364        // Refresh watermark cache if it is out of sync.
1365        if USE_WATERMARK_CACHE
1366            && !self.watermark_cache.is_synced()
1367            && let Some(ref watermark) = self.committed_watermark
1368        {
1369            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1370                (Included(once(Some(watermark.clone()))), Unbounded);
1371            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1372            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1373            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1374            // TODO(kwannoel): We can optimize it with:
1375            // 1. Either use `RefCell`.
1376            // 2. Or pass in a direct reference to LocalStateStore,
1377            //    instead of referencing it indirectly from `self`.
1378            //    Similar to how we do for pk_indices.
1379            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1380            {
1381                let mut streams = vec![];
1382                for vnode in self.vnodes().iter_vnodes() {
1383                    let stream = self
1384                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1385                        .await?;
1386                    streams.push(Box::pin(stream));
1387                }
1388                let merged_stream = merge_sort(streams);
1389                pin_mut!(merged_stream);
1390
1391                #[for_await]
1392                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1393                    let keyed_row = entry?;
1394                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1395                    // watermark column should be part of the pk
1396                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1397                        pks.push(pk);
1398                    }
1399                }
1400            }
1401
1402            let mut filler = self.watermark_cache.begin_syncing();
1403            for pk in pks {
1404                filler.insert_unchecked(DefaultOrdered(pk), ());
1405            }
1406            filler.finish();
1407
1408            let n_cache_entries = self.watermark_cache.len();
1409            if n_cache_entries < self.watermark_cache.capacity() {
1410                self.watermark_cache.set_table_row_count(n_cache_entries);
1411            }
1412        }
1413
1414        self.on_post_commit = true;
1415        Ok(StateTablePostCommit { inner: self })
1416    }
1417
1418    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1419    fn commit_pending_watermark(
1420        &mut self,
1421    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1422        let watermark = self.pending_watermark.take()?;
1423        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1424
1425        assert!(
1426            !self.pk_indices().is_empty(),
1427            "see pending watermark on empty pk"
1428        );
1429        let watermark_serializer = {
1430            match self.clean_watermark_index_in_pk {
1431                None => self.pk_serde.index(0),
1432                Some(clean_watermark_index_in_pk) => {
1433                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1434                }
1435            }
1436        };
1437
1438        let watermark_type = match self.clean_watermark_index_in_pk {
1439            None => WatermarkSerdeType::PkPrefix,
1440            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1441                0 => WatermarkSerdeType::PkPrefix,
1442                _ => WatermarkSerdeType::NonPkPrefix,
1443            },
1444        };
1445
1446        let should_clean_watermark = {
1447            {
1448                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1449                    if let Some(key) = self.watermark_cache.lowest_key() {
1450                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1451                    } else {
1452                        // Watermark cache is synced,
1453                        // And there's no key in watermark cache.
1454                        // That implies table is empty.
1455                        // We should not clean watermark.
1456                        false
1457                    }
1458                } else {
1459                    // Either we are not using watermark cache,
1460                    // Or watermark_cache is not synced.
1461                    // In either case we should clean watermark.
1462                    true
1463                }
1464            }
1465        };
1466
1467        let watermark_suffix =
1468            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1469
1470        // Compute Delete Ranges
1471        let seal_watermark = if should_clean_watermark {
1472            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1473                self.vnodes().iter_vnodes().collect_vec()
1474            }, "delete range");
1475
1476            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1477
1478            if order_type.is_ascending() {
1479                Some((
1480                    WatermarkDirection::Ascending,
1481                    VnodeWatermark::new(
1482                        self.vnodes().clone(),
1483                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1484                    ),
1485                    watermark_type,
1486                ))
1487            } else {
1488                Some((
1489                    WatermarkDirection::Descending,
1490                    VnodeWatermark::new(
1491                        self.vnodes().clone(),
1492                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1493                    ),
1494                    watermark_type,
1495                ))
1496            }
1497        } else {
1498            None
1499        };
1500        self.committed_watermark = Some(watermark);
1501
1502        // Clear the watermark cache and force a resync.
1503        // TODO(kwannoel): This can be further optimized:
1504        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1505        //    up to the largest end of delete ranges.
1506        // 2. Mark the cache as not_synced, so we can still refill it later.
1507        // 3. When refilling the cache,
1508        //    we just refill from the largest value of the cache, as the lower bound.
1509        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1510            self.watermark_cache.clear();
1511        }
1512
1513        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1514            (direction, vec![watermark], is_non_pk_prefix)
1515        })
1516    }
1517
1518    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1519        self.row_store.try_flush().await?;
1520        Ok(())
1521    }
1522}
1523
1524// Manually expand trait alias for better IDE experience.
1525pub trait RowStream<'a>: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a {}
1526impl<'a, S: Stream<Item = StreamExecutorResult<OwnedRow>> + 'a> RowStream<'a> for S {}
1527
1528pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
1529impl<'a, S: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a> KeyedRowStream<'a> for S {}
1530
1531pub trait PkRowStream<'a, K>: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a {}
1532impl<'a, K, S: Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a> PkRowStream<'a, K> for S {}
1533
1534pub trait FromVnodeBytes {
1535    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1536}
1537
1538impl FromVnodeBytes for Bytes {
1539    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1540        prefix_slice_with_vnode(vnode, bytes)
1541    }
1542}
1543
1544impl FromVnodeBytes for () {
1545    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1546}
1547
1548// Iterator functions
1549impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1550    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1551where
1552    S: StateStore,
1553    SD: ValueRowSerde,
1554{
1555    /// This function scans rows from the relational table with specific `pk_range` under the same
1556    /// `vnode`.
1557    pub async fn iter_with_vnode(
1558        &self,
1559
1560        // Optional vnode that returns an iterator only over the given range under that vnode.
1561        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1562        // iterate over each vnode that the `StateTableInner` owns.
1563        vnode: VirtualNode,
1564        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1565        prefetch_options: PrefetchOptions,
1566    ) -> StreamExecutorResult<impl RowStream<'_>> {
1567        Ok(self
1568            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1569            .await?
1570            .map_ok(|(_, row)| row))
1571    }
1572
1573    pub async fn iter_keyed_row_with_vnode(
1574        &self,
1575        vnode: VirtualNode,
1576        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1577        prefetch_options: PrefetchOptions,
1578    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1579        Ok(self
1580            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1581            .await?
1582            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1583    }
1584
1585    pub async fn iter_with_vnode_and_output_indices(
1586        &self,
1587        vnode: VirtualNode,
1588        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1589        prefetch_options: PrefetchOptions,
1590    ) -> StreamExecutorResult<impl RowStream<'_>> {
1591        assert!(IS_REPLICATED);
1592        let stream = self
1593            .iter_with_vnode(vnode, pk_range, prefetch_options)
1594            .await?;
1595        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1596    }
1597}
1598
1599impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1600    // The lowest-level API.
1601    /// Middle-level APIs:
1602    /// - [`StateTableInner::iter_with_prefix_inner`]
1603    /// - [`StateTableInner::iter_kv_with_pk_range`]
1604    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1605        &self,
1606        vnode: VirtualNode,
1607        (start, end): (Bound<Bytes>, Bound<Bytes>),
1608        prefix_hint: Option<Bytes>,
1609        prefetch_options: PrefetchOptions,
1610    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1611        if let Some(rows) = &self.all_rows {
1612            return Ok(futures::future::Either::Left(futures::stream::iter(
1613                rows.get(&vnode)
1614                    .expect("covered vnode")
1615                    .range((start, end))
1616                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1617            )));
1618        }
1619        let read_options = ReadOptions {
1620            prefix_hint,
1621            retention_seconds: self.table_option.retention_seconds,
1622            prefetch_options,
1623            cache_policy: CachePolicy::Fill(Hint::Normal),
1624        };
1625
1626        Ok(futures::future::Either::Right(
1627            deserialize_keyed_row_stream(
1628                self.state_store
1629                    .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1630                    .await?,
1631                &*self.row_serde,
1632            ),
1633        ))
1634    }
1635
1636    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1637        &self,
1638        vnode: VirtualNode,
1639        (start, end): (Bound<Bytes>, Bound<Bytes>),
1640        prefix_hint: Option<Bytes>,
1641        prefetch_options: PrefetchOptions,
1642    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1643        if let Some(rows) = &self.all_rows {
1644            return Ok(futures::future::Either::Left(futures::stream::iter(
1645                rows.get(&vnode)
1646                    .expect("covered vnode")
1647                    .range((start, end))
1648                    .rev()
1649                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1650            )));
1651        }
1652        let read_options = ReadOptions {
1653            prefix_hint,
1654            retention_seconds: self.table_option.retention_seconds,
1655            prefetch_options,
1656            cache_policy: CachePolicy::Fill(Hint::Normal),
1657        };
1658
1659        Ok(futures::future::Either::Right(
1660            deserialize_keyed_row_stream(
1661                self.state_store
1662                    .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1663                    .await?,
1664                &*self.row_serde,
1665            ),
1666        ))
1667    }
1668}
1669
1670impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1671    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1672where
1673    S: StateStore,
1674    SD: ValueRowSerde,
1675{
1676    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1677    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1678    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1679    pub async fn iter_with_prefix(
1680        &self,
1681        pk_prefix: impl Row,
1682        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1683        prefetch_options: PrefetchOptions,
1684    ) -> StreamExecutorResult<impl RowStream<'_>> {
1685        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1686            .await?;
1687        Ok(stream.map_ok(|(_, row)| row))
1688    }
1689
1690    /// Get the row from a state table with only 1 row.
1691    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1692        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1693        let stream = self
1694            .iter_with_prefix(row::empty(), sub_range, Default::default())
1695            .await?;
1696        pin_mut!(stream);
1697
1698        if let Some(res) = stream.next().await {
1699            let value = res?.into_owned_row();
1700            assert!(stream.next().await.is_none());
1701            Ok(Some(value))
1702        } else {
1703            Ok(None)
1704        }
1705    }
1706
1707    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1708    ///
1709    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1710    /// which does not matter in the use case.
1711    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1712        Ok(self
1713            .get_from_one_row_table()
1714            .await?
1715            .and_then(|row| row[0].clone()))
1716    }
1717
1718    pub async fn iter_keyed_row_with_prefix(
1719        &self,
1720        pk_prefix: impl Row,
1721        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1722        prefetch_options: PrefetchOptions,
1723    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1724        Ok(
1725            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1726                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1727        )
1728    }
1729
1730    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1731    pub async fn rev_iter_with_prefix(
1732        &self,
1733        pk_prefix: impl Row,
1734        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1735        prefetch_options: PrefetchOptions,
1736    ) -> StreamExecutorResult<impl RowStream<'_>> {
1737        Ok(
1738            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1739                .await?.map_ok(|(_, row)| row),
1740        )
1741    }
1742
1743    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1744        &self,
1745        pk_prefix: impl Row,
1746        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1747        prefetch_options: PrefetchOptions,
1748    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1749        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1750        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1751
1752        // We assume that all usages of iterating the state table only access a single vnode.
1753        // If this assertion fails, then something must be wrong with the operator implementation or
1754        // the distribution derivation from the optimizer.
1755        let vnode = self.compute_prefix_vnode(&pk_prefix);
1756
1757        // Construct prefix hint for prefix bloom filter.
1758        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1759        if self.prefix_hint_len != 0 {
1760            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1761        }
1762        let prefix_hint = {
1763            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1764                None
1765            } else {
1766                let encoded_prefix_len = self
1767                    .pk_serde
1768                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1769
1770                Some(Bytes::copy_from_slice(
1771                    &encoded_prefix[..encoded_prefix_len],
1772                ))
1773            }
1774        };
1775
1776        trace!(
1777            table_id = %self.table_id(),
1778            ?prefix_hint, ?pk_prefix,
1779            ?pk_prefix_indices,
1780            iter_direction = if REVERSE { "reverse" } else { "forward" },
1781            "storage_iter_with_prefix"
1782        );
1783
1784        let memcomparable_range =
1785            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1786
1787        Ok(if REVERSE {
1788            futures::future::Either::Left(
1789                self.row_store
1790                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1791                    .await?,
1792            )
1793        } else {
1794            futures::future::Either::Right(
1795                self.row_store
1796                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1797                    .await?,
1798            )
1799        })
1800    }
1801
1802    /// This function scans raw key-values from the relational table with specific `pk_range` under
1803    /// the same `vnode`.
1804    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1805        &'a self,
1806        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1807        // Optional vnode that returns an iterator only over the given range under that vnode.
1808        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1809        // iterate over each vnode that the `StateTableInner` owns.
1810        vnode: VirtualNode,
1811        prefetch_options: PrefetchOptions,
1812    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1813        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1814
1815        // TODO: provide a trace of useful params.
1816        self.row_store
1817            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1818            .await
1819    }
1820
1821    #[cfg(test)]
1822    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1823        &self.watermark_cache
1824    }
1825}
1826
1827fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1828    iter: impl StateStoreIter + 'a,
1829    deserializer: &'a impl ValueRowSerde,
1830) -> impl PkRowStream<'a, K> {
1831    iter.into_stream(move |(key, value)| {
1832        Ok((
1833            K::copy_from_slice(key.user_key.table_key.as_ref()),
1834            deserializer.deserialize(value).map(OwnedRow::new)?,
1835        ))
1836    })
1837    .map_err(Into::into)
1838}
1839
1840pub fn prefix_range_to_memcomparable(
1841    pk_serde: &OrderedRowSerde,
1842    range: &(Bound<impl Row>, Bound<impl Row>),
1843) -> (Bound<Bytes>, Bound<Bytes>) {
1844    (
1845        start_range_to_memcomparable(pk_serde, &range.0),
1846        end_range_to_memcomparable(pk_serde, &range.1, None),
1847    )
1848}
1849
1850fn prefix_and_sub_range_to_memcomparable(
1851    pk_serde: &OrderedRowSerde,
1852    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1853    pk_prefix: impl Row,
1854) -> (Bound<Bytes>, Bound<Bytes>) {
1855    let (range_start, range_end) = sub_range;
1856    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1857    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1858    let start_range = match range_start {
1859        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1860        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1861        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1862    };
1863    let end_range = match range_end {
1864        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1865        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1866        Unbounded => Unbounded,
1867    };
1868    (
1869        start_range_to_memcomparable(pk_serde, &start_range),
1870        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1871    )
1872}
1873
1874fn start_range_to_memcomparable<R: Row>(
1875    pk_serde: &OrderedRowSerde,
1876    bound: &Bound<R>,
1877) -> Bound<Bytes> {
1878    let serialize_pk_prefix = |pk_prefix: &R| {
1879        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1880        serialize_pk(pk_prefix, &prefix_serializer)
1881    };
1882    match bound {
1883        Unbounded => Unbounded,
1884        Included(r) => {
1885            let serialized = serialize_pk_prefix(r);
1886
1887            Included(serialized)
1888        }
1889        Excluded(r) => {
1890            let serialized = serialize_pk_prefix(r);
1891
1892            start_bound_of_excluded_prefix(&serialized)
1893        }
1894    }
1895}
1896
1897fn end_range_to_memcomparable<R: Row>(
1898    pk_serde: &OrderedRowSerde,
1899    bound: &Bound<R>,
1900    serialized_pk_prefix: Option<Bytes>,
1901) -> Bound<Bytes> {
1902    let serialize_pk_prefix = |pk_prefix: &R| {
1903        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1904        serialize_pk(pk_prefix, &prefix_serializer)
1905    };
1906    match bound {
1907        Unbounded => match serialized_pk_prefix {
1908            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1909            None => Unbounded,
1910        },
1911        Included(r) => {
1912            let serialized = serialize_pk_prefix(r);
1913
1914            end_bound_of_prefix(&serialized)
1915        }
1916        Excluded(r) => {
1917            let serialized = serialize_pk_prefix(r);
1918            Excluded(serialized)
1919        }
1920    }
1921}
1922
1923fn fill_non_output_indices(
1924    i2o_mapping: &ColIndexMapping,
1925    data_types: &[DataType],
1926    chunk: StreamChunk,
1927) -> StreamChunk {
1928    let cardinality = chunk.cardinality();
1929    let (ops, columns, vis) = chunk.into_inner();
1930    let mut full_columns = Vec::with_capacity(data_types.len());
1931    for (i, data_type) in data_types.iter().enumerate() {
1932        if let Some(j) = i2o_mapping.try_map(i) {
1933            full_columns.push(columns[j].clone());
1934        } else {
1935            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1936            column_builder.append_n_null(cardinality);
1937            let column: ArrayRef = column_builder.finish().into();
1938            full_columns.push(column)
1939        }
1940    }
1941    let data_chunk = DataChunk::new(full_columns, vis);
1942    StreamChunk::from_parts(ops, data_chunk)
1943}
1944
1945#[cfg(test)]
1946mod tests {
1947    use std::fmt::Debug;
1948
1949    use expect_test::{Expect, expect};
1950
1951    use super::*;
1952
1953    fn check(actual: impl Debug, expect: Expect) {
1954        let actual = format!("{:#?}", actual);
1955        expect.assert_eq(&actual);
1956    }
1957
1958    #[test]
1959    fn test_fill_non_output_indices() {
1960        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1961        let replicated_chunk = [OwnedRow::new(vec![
1962            Some(222_i32.into()),
1963            Some(2_i32.into()),
1964        ])];
1965        let replicated_chunk = StreamChunk::from_parts(
1966            vec![Op::Insert],
1967            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1968        );
1969        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1970        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1971        check(
1972            filled_chunk,
1973            expect![[r#"
1974            StreamChunk { cardinality: 1, capacity: 1, data:
1975            +---+---+---+-----+
1976            | + | 2 |   | 222 |
1977            +---+---+---+-----+
1978             }"#]],
1979        );
1980    }
1981}