risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73/// Mostly watermark operators will have inserts (append-only).
74/// So this number should not need to be very large.
75/// But we may want to improve this choice in the future.
76const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78/// This macro is used to mark a point where we want to randomly discard the operation and early
79/// return, only in insane mode.
80macro_rules! insane_mode_discard_point {
81    () => {{
82        use rand::Rng;
83        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84            return;
85        }
86    }};
87}
88
89/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
90/// row-based encoding.
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    row_store: StateTableRowStore<S::Local, SD>,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Indices of primary key.
116    /// Note that the index is based on the all columns of the table, instead of the output ones.
117    // FIXME: revisit constructions and usages.
118    pk_indices: Vec<usize>,
119
120    /// Distribution of the state table.
121    ///
122    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
123    /// executor. The table will also check whether the written rows
124    /// conform to this partition.
125    distribution: TableDistribution,
126
127    prefix_hint_len: usize,
128
129    value_indices: Option<Vec<usize>>,
130
131    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
132    pending_watermark: Option<ScalarImpl>,
133    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
134    committed_watermark: Option<ScalarImpl>,
135    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
136    watermark_cache: StateTableWatermarkCache,
137
138    /// Data Types
139    /// We will need to use to build data chunks from state table rows.
140    data_types: Vec<DataType>,
141
142    /// "i" here refers to the base `state_table`'s actual schema.
143    /// "o" here refers to the replicated state table's output schema.
144    /// This mapping is used to reconstruct a row being written from replicated state table.
145    /// Such that the schema of this row will match the full schema of the base state table.
146    /// It is only applicable for replication.
147    i2o_mapping: ColIndexMapping,
148
149    /// Output indices
150    /// Used for:
151    /// 1. Computing `output_value_indices` to ser/de replicated rows.
152    /// 2. Computing output pk indices to used them for backfill state.
153    pub output_indices: Vec<usize>,
154
155    op_consistency_level: StateTableOpConsistencyLevel,
156
157    clean_watermark_index_in_pk: Option<i32>,
158
159    /// Flag to indicate whether the state table has called `commit`, but has not called
160    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
161    on_post_commit: bool,
162}
163
164/// `StateTable` will use `BasicSerde` as default
165pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
167/// Used for `ArrangementBackfill` executor.
168pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169/// `WatermarkCacheStateTable` caches the watermark column.
170/// It will reduce state cleaning overhead.
171pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175// initialize
176impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179    S: StateStore,
180    SD: ValueRowSerde,
181{
182    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
183    /// and otherwise, deadlock can be likely to happen.
184    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185        self.row_store
186            .init(epoch, self.distribution.vnodes())
187            .await?;
188        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189        Ok(())
190    }
191
192    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193        self.store
194            .try_wait_epoch(
195                HummockReadEpoch::Committed(prev_epoch),
196                TryWaitEpochOptions {
197                    table_id: self.table_id,
198                },
199            )
200            .await
201    }
202
203    pub fn state_store(&self) -> &S {
204        &self.store
205    }
206}
207
208fn consistent_old_value_op(
209    row_serde: Arc<impl ValueRowSerde>,
210    is_log_store: bool,
211) -> OpConsistencyLevel {
212    OpConsistencyLevel::ConsistentOldValue {
213        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214            if first == second {
215                return true;
216            }
217            let first = match row_serde.deserialize(first) {
218                Ok(rows) => rows,
219                Err(e) => {
220                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221                    return false;
222                }
223            };
224            let second = match row_serde.deserialize(second) {
225                Ok(rows) => rows,
226                Err(e) => {
227                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228                    return false;
229                }
230            };
231            if first != second {
232                error!(first = ?first, second = ?second, "sanity check fail");
233                false
234            } else {
235                true
236            }
237        }),
238        is_log_store,
239    }
240}
241
242macro_rules! dispatch_value_indices {
243    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244        if let Some(value_indices) = $value_indices {
245            $(
246                let $row_var_name = $row_var_name.project(value_indices);
247            )+
248            $body
249        } else {
250            $body
251        }
252    };
253}
254
255/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
256/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
257/// provides method to read (`get` and `iter`) over serialized key (or key range)
258/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
259struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260    state_store: LS,
261    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263    table_id: TableId,
264    table_option: TableOption,
265    row_serde: Arc<SD>,
266    // should be only used for debugging in panic message of handle_mem_table_error
267    pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272        if let Some(rows) = &mut self.all_rows {
273            rows.clear();
274            let start_time = Instant::now();
275            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276                let state_store = &self.state_store;
277                let retention_seconds = self.table_option.retention_seconds;
278                let row_serde = &self.row_serde;
279                async move {
280                    let mut rows = BTreeMap::new();
281                    let memcomparable_range_with_vnode =
282                        prefixed_range_with_vnode::<Bytes>(.., vnode);
283                    // TODO: set read options
284                    let stream = deserialize_keyed_row_stream::<Bytes>(
285                        state_store
286                            .iter(
287                                memcomparable_range_with_vnode,
288                                ReadOptions {
289                                    prefix_hint: None,
290                                    prefetch_options: Default::default(),
291                                    cache_policy: Default::default(),
292                                    retention_seconds,
293                                },
294                            )
295                            .await?,
296                        &**row_serde,
297                    );
298                    pin_mut!(stream);
299                    while let Some((encoded_key, row)) = stream.try_next().await? {
300                        let key = TableKey(encoded_key);
301                        let (iter_vnode, key) = key.split_vnode_bytes();
302                        assert_eq!(vnode, iter_vnode);
303                        rows.try_insert(key, row).expect("non-duplicated");
304                    }
305                    Ok((vnode, rows)) as StreamExecutorResult<_>
306                }
307            }))
308            .await?
309            .into_iter()
310            .collect();
311            // avoid flooding e2e-test log
312            if !cfg!(debug_assertions) {
313                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314            }
315        }
316        Ok(())
317    }
318
319    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320        self.state_store.init(InitOptions::new(epoch)).await?;
321        self.may_reload_all_rows(vnode_bitmap).await
322    }
323
324    async fn update_vnode_bitmap(
325        &mut self,
326        vnodes: Arc<Bitmap>,
327    ) -> StreamExecutorResult<Arc<Bitmap>> {
328        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329        self.may_reload_all_rows(&vnodes).await?;
330        Ok(prev_vnodes)
331    }
332
333    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334        self.state_store.try_flush().await?;
335        Ok(())
336    }
337
338    async fn seal_current_epoch(
339        &mut self,
340        next_epoch: u64,
341        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343    ) -> StreamExecutorResult<()> {
344        if let Some((direction, watermarks, serde_type)) = &table_watermarks
345            && let Some(rows) = &mut self.all_rows
346        {
347            match serde_type {
348                WatermarkSerdeType::PkPrefix => {
349                    for vnode_watermark in watermarks {
350                        match direction {
351                            WatermarkDirection::Ascending => {
352                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
354                                    // split_off returns everything after the given key, including the key.
355                                    *rows = rows.split_off(vnode_watermark.watermark());
356                                }
357                            }
358                            WatermarkDirection::Descending => {
359                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
360                                let split_off_key = next_key(vnode_watermark.watermark());
361                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
363                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
364                                    // (..Include(vnode_watermark.watermark()))
365                                    rows.split_off(split_off_key.as_slice());
366                                }
367                            }
368                        }
369                    }
370                }
371                WatermarkSerdeType::NonPkPrefix => {
372                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373                    self.all_rows = None;
374                }
375            }
376        }
377        self.state_store
378            .flush()
379            .instrument(tracing::info_span!("state_table_flush"))
380            .await?;
381        let switch_op_consistency_level =
382            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384                StateTableOpConsistencyLevel::ConsistentOldValue => {
385                    consistent_old_value_op(self.row_serde.clone(), false)
386                }
387                StateTableOpConsistencyLevel::LogStoreEnabled => {
388                    consistent_old_value_op(self.row_serde.clone(), true)
389                }
390            });
391        self.state_store.seal_current_epoch(
392            next_epoch,
393            SealCurrentEpochOptions {
394                table_watermarks,
395                switch_op_consistency_level,
396            },
397        );
398        Ok(())
399    }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404    /// Op is inconsistent
405    Inconsistent,
406    /// Op is consistent.
407    /// - Insert op should ensure that the key does not exist previously
408    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
409    ConsistentOldValue,
410    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
411    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
412    LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416    'a,
417    S,
418    SD,
419    const IS_REPLICATED: bool,
420    const USE_WATERMARK_CACHE: bool,
421    PreloadAllRow,
422> {
423    table_catalog: &'a Table,
424    store: S,
425    vnodes: Option<Arc<Bitmap>>,
426    op_consistency_level: Option<StateTableOpConsistencyLevel>,
427    output_column_ids: Option<Vec<ColumnId>>,
428    preload_all_rows: PreloadAllRow,
429
430    _serde: PhantomData<SD>,
431}
432
433impl<
434    'a,
435    S: StateStore,
436    SD: ValueRowSerde,
437    const IS_REPLICATED: bool,
438    const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442        Self {
443            table_catalog,
444            store,
445            vnodes,
446            op_consistency_level: None,
447            output_column_ids: None,
448            preload_all_rows: (),
449            _serde: Default::default(),
450        }
451    }
452
453    fn with_preload_all_rows(
454        self,
455        preload_all_rows: bool,
456    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457        StateTableBuilder {
458            table_catalog: self.table_catalog,
459            store: self.store,
460            vnodes: self.vnodes,
461            op_consistency_level: self.op_consistency_level,
462            output_column_ids: self.output_column_ids,
463            preload_all_rows,
464            _serde: Default::default(),
465        }
466    }
467
468    pub fn enable_preload_all_rows_by_config(
469        self,
470        config: &StreamingConfig,
471    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472        let developer = &config.developer;
473        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474            !developer
475                .mem_preload_state_table_ids_blacklist
476                .contains(&self.table_catalog.id)
477        } else {
478            developer
479                .mem_preload_state_table_ids_whitelist
480                .contains(&self.table_catalog.id)
481        };
482        self.with_preload_all_rows(preload_all_rows)
483    }
484
485    pub fn forbid_preload_all_rows(
486        self,
487    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488        self.with_preload_all_rows(false)
489    }
490}
491
492impl<
493    'a,
494    S: StateStore,
495    SD: ValueRowSerde,
496    const IS_REPLICATED: bool,
497    const USE_WATERMARK_CACHE: bool,
498    PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501    pub fn with_op_consistency_level(
502        mut self,
503        op_consistency_level: StateTableOpConsistencyLevel,
504    ) -> Self {
505        self.op_consistency_level = Some(op_consistency_level);
506        self
507    }
508
509    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510        self.output_column_ids = Some(output_column_ids);
511        self
512    }
513}
514
515impl<
516    'a,
517    S: StateStore,
518    SD: ValueRowSerde,
519    const IS_REPLICATED: bool,
520    const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524        let mut preload_all_rows = self.preload_all_rows;
525        if preload_all_rows
526            && let Err(e) =
527                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528        {
529            warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530            preload_all_rows = false;
531        }
532        StateTableInner::from_table_catalog_inner(
533            self.table_catalog,
534            self.store,
535            self.vnodes,
536            self.op_consistency_level
537                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538            self.output_column_ids.unwrap_or_default(),
539            preload_all_rows,
540        )
541        .await
542    }
543}
544
545// initialize
546// FIXME(kwannoel): Enforce that none of the constructors here
547// should be used by replicated state table.
548// Apart from from_table_catalog_inner.
549impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552    S: StateStore,
553    SD: ValueRowSerde,
554{
555    /// Create state table from table catalog and store.
556    ///
557    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
558    #[cfg(any(test, feature = "test"))]
559    pub async fn from_table_catalog(
560        table_catalog: &Table,
561        store: S,
562        vnodes: Option<Arc<Bitmap>>,
563    ) -> Self {
564        StateTableBuilder::new(table_catalog, store, vnodes)
565            .forbid_preload_all_rows()
566            .build()
567            .await
568    }
569
570    /// Create state table from table catalog and store with sanity check disabled.
571    pub async fn from_table_catalog_inconsistent_op(
572        table_catalog: &Table,
573        store: S,
574        vnodes: Option<Arc<Bitmap>>,
575    ) -> Self {
576        StateTableBuilder::new(table_catalog, store, vnodes)
577            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
578            .forbid_preload_all_rows()
579            .build()
580            .await
581    }
582
583    /// Create state table from table catalog and store.
584    async fn from_table_catalog_inner(
585        table_catalog: &Table,
586        store: S,
587        vnodes: Option<Arc<Bitmap>>,
588        op_consistency_level: StateTableOpConsistencyLevel,
589        output_column_ids: Vec<ColumnId>,
590        preload_all_rows: bool,
591    ) -> Self {
592        let table_id = TableId::new(table_catalog.id);
593        let table_columns: Vec<ColumnDesc> = table_catalog
594            .columns
595            .iter()
596            .map(|col| col.column_desc.as_ref().unwrap().into())
597            .collect();
598        let data_types: Vec<DataType> = table_catalog
599            .columns
600            .iter()
601            .map(|col| {
602                col.get_column_desc()
603                    .unwrap()
604                    .get_column_type()
605                    .unwrap()
606                    .into()
607            })
608            .collect();
609        let order_types: Vec<OrderType> = table_catalog
610            .pk
611            .iter()
612            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
613            .collect();
614        let dist_key_indices: Vec<usize> = table_catalog
615            .distribution_key
616            .iter()
617            .map(|dist_index| *dist_index as usize)
618            .collect();
619
620        let pk_indices = table_catalog
621            .pk
622            .iter()
623            .map(|col_order| col_order.column_index as usize)
624            .collect_vec();
625
626        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
627        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
628            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
629        } else {
630            table_catalog
631                .get_dist_key_in_pk()
632                .iter()
633                .map(|idx| *idx as usize)
634                .collect()
635        };
636
637        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
638            let vnode_col_idx = *idx as usize;
639            pk_indices.iter().position(|&i| vnode_col_idx == i)
640        });
641
642        let distribution =
643            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
644        assert_eq!(
645            distribution.vnode_count(),
646            table_catalog.vnode_count(),
647            "vnode count mismatch, scanning table {} under wrong distribution?",
648            table_catalog.name,
649        );
650
651        let pk_data_types = pk_indices
652            .iter()
653            .map(|i| table_columns[*i].data_type.clone())
654            .collect();
655        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
656
657        let input_value_indices = table_catalog
658            .value_indices
659            .iter()
660            .map(|val| *val as usize)
661            .collect_vec();
662
663        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
664
665        // if value_indices is the no shuffle full columns.
666        let value_indices = match input_value_indices.len() == table_columns.len()
667            && input_value_indices == no_shuffle_value_indices
668        {
669            true => None,
670            false => Some(input_value_indices),
671        };
672        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
673
674        let row_serde = Arc::new(SD::new(
675            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
676            Arc::from(table_columns.clone().into_boxed_slice()),
677        ));
678
679        let state_table_op_consistency_level = op_consistency_level;
680        let op_consistency_level = match op_consistency_level {
681            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
682            StateTableOpConsistencyLevel::ConsistentOldValue => {
683                consistent_old_value_op(row_serde.clone(), false)
684            }
685            StateTableOpConsistencyLevel::LogStoreEnabled => {
686                consistent_old_value_op(row_serde.clone(), true)
687            }
688        };
689
690        let table_option = TableOption::new(table_catalog.retention_seconds);
691        let new_local_options = if IS_REPLICATED {
692            NewLocalOptions::new_replicated(
693                table_id,
694                op_consistency_level,
695                table_option,
696                distribution.vnodes().clone(),
697            )
698        } else {
699            NewLocalOptions::new(
700                table_id,
701                op_consistency_level,
702                table_option,
703                distribution.vnodes().clone(),
704                true,
705            )
706        };
707        let local_state_store = store.new_local(new_local_options).await;
708
709        // If state table has versioning, that means it supports
710        // Schema change. In that case, the row encoding should be column aware as well.
711        // Otherwise both will be false.
712        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
713        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
714        assert_eq!(
715            table_catalog.version.is_some(),
716            row_serde.kind().is_column_aware()
717        );
718
719        // Restore persisted table watermark.
720        let watermark_serde = if pk_indices.is_empty() {
721            None
722        } else {
723            match table_catalog.clean_watermark_index_in_pk {
724                None => Some(pk_serde.index(0)),
725                Some(clean_watermark_index_in_pk) => {
726                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
727                }
728            }
729        };
730        let max_watermark_of_vnodes = distribution
731            .vnodes()
732            .iter_vnodes()
733            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
734            .max();
735        let committed_watermark = if let Some(deser) = watermark_serde
736            && let Some(max_watermark) = max_watermark_of_vnodes
737        {
738            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
739                assert!(row.len() == 1);
740                row[0].clone()
741            });
742            if deserialized.is_none() {
743                tracing::error!(
744                    vnodes = ?distribution.vnodes(),
745                    watermark = ?max_watermark,
746                    "Failed to deserialize persisted watermark from state store.",
747                );
748            }
749            deserialized
750        } else {
751            None
752        };
753
754        let watermark_cache = if USE_WATERMARK_CACHE {
755            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
756        } else {
757            StateTableWatermarkCache::new(0)
758        };
759
760        // Get info for replicated state table.
761        let output_column_ids_to_input_idx = output_column_ids
762            .iter()
763            .enumerate()
764            .map(|(pos, id)| (*id, pos))
765            .collect::<HashMap<_, _>>();
766
767        // Compute column descriptions
768        let columns: Vec<ColumnDesc> = table_catalog
769            .columns
770            .iter()
771            .map(|c| c.column_desc.as_ref().unwrap().into())
772            .collect_vec();
773
774        // Compute i2o mapping
775        // Note that this can be a partial mapping, since we use the i2o mapping to get
776        // any 1 of the output columns, and use that to fill the input column.
777        let mut i2o_mapping = vec![None; columns.len()];
778        for (i, column) in columns.iter().enumerate() {
779            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
780                i2o_mapping[i] = Some(*pos);
781            }
782        }
783        // We can prune any duplicate column indices
784        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
785
786        // Compute output indices
787        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
788
789        Self {
790            table_id,
791            row_store: StateTableRowStore {
792                all_rows: preload_all_rows.then(HashMap::new),
793                table_option,
794                state_store: local_state_store,
795                row_serde,
796                pk_serde: pk_serde.clone(),
797                table_id,
798            },
799            store,
800            epoch: None,
801            pk_serde,
802            pk_indices,
803            distribution,
804            prefix_hint_len,
805            value_indices,
806            pending_watermark: None,
807            committed_watermark,
808            watermark_cache,
809            data_types,
810            output_indices,
811            i2o_mapping,
812            op_consistency_level: state_table_op_consistency_level,
813            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
814            on_post_commit: false,
815        }
816    }
817
818    pub fn get_data_types(&self) -> &[DataType] {
819        &self.data_types
820    }
821
822    pub fn table_id(&self) -> u32 {
823        self.table_id.table_id
824    }
825
826    /// Get the vnode value with given (prefix of) primary key
827    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
828        self.distribution
829            .try_compute_vnode_by_pk_prefix(pk_prefix)
830            .expect("For streaming, the given prefix must be enough to calculate the vnode")
831    }
832
833    /// Get the vnode value of the given primary key
834    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
835        self.distribution.compute_vnode_by_pk(pk)
836    }
837
838    /// NOTE(kwannoel): This is used by backfill.
839    /// We want to check pk indices of upstream table.
840    pub fn pk_indices(&self) -> &[usize] {
841        &self.pk_indices
842    }
843
844    /// Get the indices of the primary key columns in the output columns.
845    ///
846    /// Returns `None` if any of the primary key columns is not in the output columns.
847    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
848        assert!(IS_REPLICATED);
849        self.pk_indices
850            .iter()
851            .map(|&i| self.output_indices.iter().position(|&j| i == j))
852            .collect()
853    }
854
855    pub fn pk_serde(&self) -> &OrderedRowSerde {
856        &self.pk_serde
857    }
858
859    pub fn vnodes(&self) -> &Arc<Bitmap> {
860        self.distribution.vnodes()
861    }
862
863    pub fn value_indices(&self) -> &Option<Vec<usize>> {
864        &self.value_indices
865    }
866
867    pub fn is_consistent_op(&self) -> bool {
868        matches!(
869            self.op_consistency_level,
870            StateTableOpConsistencyLevel::ConsistentOldValue
871                | StateTableOpConsistencyLevel::LogStoreEnabled
872        )
873    }
874}
875
876impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
877where
878    S: StateStore,
879    SD: ValueRowSerde,
880{
881    /// Create replicated state table from table catalog with output indices
882    pub async fn new_replicated(
883        table_catalog: &Table,
884        store: S,
885        vnodes: Option<Arc<Bitmap>>,
886        output_column_ids: Vec<ColumnId>,
887    ) -> Self {
888        // TODO: can it be ConsistentOldValue?
889        // TODO: may enable preload_all_rows
890        StateTableBuilder::new(table_catalog, store, vnodes)
891            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
892            .with_output_column_ids(output_column_ids)
893            .forbid_preload_all_rows()
894            .build()
895            .await
896    }
897}
898
899// point get
900impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
901    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
902where
903    S: StateStore,
904    SD: ValueRowSerde,
905{
906    /// Get a single row from state table.
907    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
908        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
909        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
910        match row {
911            Some(row) => {
912                if IS_REPLICATED {
913                    // If the table is replicated, we need to deserialize the row with the output
914                    // indices.
915                    let row = row.project(&self.output_indices);
916                    Ok(Some(row.into_owned_row()))
917                } else {
918                    Ok(Some(row))
919                }
920            }
921            None => Ok(None),
922        }
923    }
924
925    /// Get a raw encoded row from state table.
926    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
927        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
928        self.row_store.exists(serialized_pk, prefix_hint).await
929    }
930
931    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
932        assert!(pk.len() <= self.pk_indices.len());
933        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
934    }
935
936    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
937        let serialized_pk = self.serialize_pk(&pk);
938        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
939            Some(serialized_pk.slice(VirtualNode::SIZE..))
940        } else {
941            #[cfg(debug_assertions)]
942            if self.prefix_hint_len != 0 {
943                warn!(
944                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
945                );
946            }
947            None
948        };
949        (serialized_pk, prefix_hint)
950    }
951}
952
953impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
954    async fn get(
955        &self,
956        key_bytes: TableKey<Bytes>,
957        prefix_hint: Option<Bytes>,
958    ) -> StreamExecutorResult<Option<OwnedRow>> {
959        if let Some(rows) = &self.all_rows {
960            let (vnode, key) = key_bytes.split_vnode();
961            return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
962        }
963        let read_options = ReadOptions {
964            prefix_hint,
965            retention_seconds: self.table_option.retention_seconds,
966            cache_policy: CachePolicy::Fill(Hint::Normal),
967            ..Default::default()
968        };
969
970        // TODO: avoid clone when `on_key_value_fn` can be non-static
971        let row_serde = self.row_serde.clone();
972
973        self.state_store
974            .on_key_value(key_bytes, read_options, move |_, value| {
975                let row = row_serde.deserialize(value)?;
976                Ok(OwnedRow::new(row))
977            })
978            .await
979            .map_err(Into::into)
980    }
981
982    async fn exists(
983        &self,
984        key_bytes: TableKey<Bytes>,
985        prefix_hint: Option<Bytes>,
986    ) -> StreamExecutorResult<bool> {
987        if let Some(rows) = &self.all_rows {
988            let (vnode, key) = key_bytes.split_vnode();
989            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
990        }
991        let read_options = ReadOptions {
992            prefix_hint,
993            retention_seconds: self.table_option.retention_seconds,
994            cache_policy: CachePolicy::Fill(Hint::Normal),
995            ..Default::default()
996        };
997        let result = self
998            .state_store
999            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1000            .await?;
1001        Ok(result.is_some())
1002    }
1003}
1004
1005/// A callback struct returned from [`StateTableInner::commit`].
1006///
1007/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1008/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1009/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1010///
1011/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1012/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1013/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1014/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1015/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1016/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1017/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1018/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1019#[must_use]
1020pub struct StateTablePostCommit<
1021    'a,
1022    S,
1023    SD = BasicSerde,
1024    const IS_REPLICATED: bool = false,
1025    const USE_WATERMARK_CACHE: bool = false,
1026> where
1027    S: StateStore,
1028    SD: ValueRowSerde,
1029{
1030    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1031}
1032
1033impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1034    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1035where
1036    S: StateStore,
1037    SD: ValueRowSerde,
1038{
1039    pub async fn post_yield_barrier(
1040        mut self,
1041        new_vnodes: Option<Arc<Bitmap>>,
1042    ) -> StreamExecutorResult<
1043        Option<(
1044            (
1045                Arc<Bitmap>,
1046                Arc<Bitmap>,
1047                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1048            ),
1049            bool,
1050        )>,
1051    > {
1052        self.inner.on_post_commit = false;
1053        Ok(if let Some(new_vnodes) = new_vnodes {
1054            let (old_vnodes, cache_may_stale) =
1055                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1056            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1057        } else {
1058            None
1059        })
1060    }
1061
1062    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1063        &*self.inner
1064    }
1065
1066    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1067    async fn update_vnode_bitmap(
1068        &mut self,
1069        new_vnodes: Arc<Bitmap>,
1070    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1071        let prev_vnodes = self
1072            .inner
1073            .row_store
1074            .update_vnode_bitmap(new_vnodes.clone())
1075            .await?;
1076        assert_eq!(
1077            &prev_vnodes,
1078            self.inner.vnodes(),
1079            "state table and state store vnode bitmap mismatches"
1080        );
1081
1082        if self.inner.distribution.is_singleton() {
1083            assert_eq!(
1084                &new_vnodes,
1085                self.inner.vnodes(),
1086                "should not update vnode bitmap for singleton table"
1087            );
1088        }
1089        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1090
1091        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1092
1093        if cache_may_stale {
1094            self.inner.pending_watermark = None;
1095            if USE_WATERMARK_CACHE {
1096                self.inner.watermark_cache.clear();
1097            }
1098        }
1099
1100        Ok((
1101            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1102            cache_may_stale,
1103        ))
1104    }
1105}
1106
1107// write
1108impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1109    fn handle_mem_table_error(&self, e: StorageError) {
1110        let e = match e.into_inner() {
1111            ErrorKind::MemTable(e) => e,
1112            _ => unreachable!("should only get memtable error"),
1113        };
1114        match *e {
1115            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1116                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1117                panic!(
1118                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1119                    self.table_id,
1120                    vnode,
1121                    &key,
1122                    prev.debug_fmt(&*self.row_serde),
1123                    new.debug_fmt(&*self.row_serde),
1124                )
1125            }
1126        }
1127    }
1128
1129    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1130        insane_mode_discard_point!();
1131        let value_bytes = self.row_serde.serialize(&value).into();
1132        if let Some(rows) = &mut self.all_rows {
1133            let (vnode, key) = key.split_vnode_bytes();
1134            rows.get_mut(&vnode)
1135                .expect("covered vnode")
1136                .insert(key, value.into_owned_row());
1137        }
1138        self.state_store
1139            .insert(key, value_bytes, None)
1140            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1141    }
1142
1143    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1144        insane_mode_discard_point!();
1145        let value_bytes = self.row_serde.serialize(value).into();
1146        if let Some(rows) = &mut self.all_rows {
1147            let (vnode, key) = key.split_vnode();
1148            rows.get_mut(&vnode).expect("covered vnode").remove(key);
1149        }
1150        self.state_store
1151            .delete(key, value_bytes)
1152            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1153    }
1154
1155    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1156        insane_mode_discard_point!();
1157        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1158        let old_value_bytes = self.row_serde.serialize(old_value).into();
1159        if let Some(rows) = &mut self.all_rows {
1160            let (vnode, key) = key_bytes.split_vnode_bytes();
1161            rows.get_mut(&vnode)
1162                .expect("covered vnode")
1163                .insert(key, new_value.into_owned_row());
1164        }
1165        self.state_store
1166            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1167            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1168    }
1169}
1170
1171impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1172    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1173where
1174    S: StateStore,
1175    SD: ValueRowSerde,
1176{
1177    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1178    /// the table.
1179    pub fn insert(&mut self, value: impl Row) {
1180        let pk_indices = &self.pk_indices;
1181        let pk = (&value).project(pk_indices);
1182        if USE_WATERMARK_CACHE {
1183            self.watermark_cache.insert(&pk);
1184        }
1185
1186        let key_bytes = self.serialize_pk(&pk);
1187        dispatch_value_indices!(&self.value_indices, [value], {
1188            self.row_store.insert(key_bytes, value)
1189        })
1190    }
1191
1192    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1193    /// column desc of the table.
1194    pub fn delete(&mut self, old_value: impl Row) {
1195        let pk_indices = &self.pk_indices;
1196        let pk = (&old_value).project(pk_indices);
1197        if USE_WATERMARK_CACHE {
1198            self.watermark_cache.delete(&pk);
1199        }
1200
1201        let key_bytes = self.serialize_pk(&pk);
1202        dispatch_value_indices!(&self.value_indices, [old_value], {
1203            self.row_store.delete(key_bytes, old_value)
1204        })
1205    }
1206
1207    /// Update a row. The old and new value should have the same pk.
1208    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1209        let old_pk = (&old_value).project(self.pk_indices());
1210        let new_pk = (&new_value).project(self.pk_indices());
1211        debug_assert!(
1212            Row::eq(&old_pk, new_pk),
1213            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1214            self.table_id
1215        );
1216
1217        let key_bytes = self.serialize_pk(&new_pk);
1218        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1219            self.row_store.update(key_bytes, old_value, new_value)
1220        })
1221    }
1222
1223    /// Write a record into state table. Must have the same schema with the table.
1224    pub fn write_record(&mut self, record: Record<impl Row>) {
1225        match record {
1226            Record::Insert { new_row } => self.insert(new_row),
1227            Record::Delete { old_row } => self.delete(old_row),
1228            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1229        }
1230    }
1231
1232    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1233        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1234    }
1235
1236    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1237    // allow(izip, which use zip instead of zip_eq)
1238    #[allow(clippy::disallowed_methods)]
1239    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1240        let chunk = if IS_REPLICATED {
1241            self.fill_non_output_indices(chunk)
1242        } else {
1243            chunk
1244        };
1245
1246        let vnodes = self
1247            .distribution
1248            .compute_chunk_vnode(&chunk, &self.pk_indices);
1249
1250        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1251            let Some((op, row)) = optional_row else {
1252                continue;
1253            };
1254            let pk = row.project(&self.pk_indices);
1255            let vnode = vnodes[idx];
1256            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1257            match op {
1258                Op::Insert | Op::UpdateInsert => {
1259                    if USE_WATERMARK_CACHE {
1260                        self.watermark_cache.insert(&pk);
1261                    }
1262                    dispatch_value_indices!(&self.value_indices, [row], {
1263                        self.row_store.insert(key_bytes, row);
1264                    });
1265                }
1266                Op::Delete | Op::UpdateDelete => {
1267                    if USE_WATERMARK_CACHE {
1268                        self.watermark_cache.delete(&pk);
1269                    }
1270                    dispatch_value_indices!(&self.value_indices, [row], {
1271                        self.row_store.delete(key_bytes, row);
1272                    });
1273                }
1274            }
1275        }
1276    }
1277
1278    /// Update watermark for state cleaning.
1279    ///
1280    /// # Arguments
1281    ///
1282    /// * `watermark` - Latest watermark received.
1283    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1284        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1285        self.pending_watermark = Some(watermark);
1286    }
1287
1288    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1289    /// table through `update_watermark` method.
1290    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1291        self.committed_watermark.as_ref()
1292    }
1293
1294    pub async fn commit(
1295        &mut self,
1296        new_epoch: EpochPair,
1297    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1298    {
1299        self.commit_inner(new_epoch, None).await
1300    }
1301
1302    #[cfg(test)]
1303    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1304        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1305    }
1306
1307    pub async fn commit_assert_no_update_vnode_bitmap(
1308        &mut self,
1309        new_epoch: EpochPair,
1310    ) -> StreamExecutorResult<()> {
1311        let post_commit = self.commit_inner(new_epoch, None).await?;
1312        post_commit.post_yield_barrier(None).await?;
1313        Ok(())
1314    }
1315
1316    pub async fn commit_may_switch_consistent_op(
1317        &mut self,
1318        new_epoch: EpochPair,
1319        op_consistency_level: StateTableOpConsistencyLevel,
1320    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1321    {
1322        if self.op_consistency_level != op_consistency_level {
1323            // avoid flooding e2e-test log
1324            if !cfg!(debug_assertions) {
1325                info!(
1326                    ?new_epoch,
1327                    prev_op_consistency_level = ?self.op_consistency_level,
1328                    ?op_consistency_level,
1329                    table_id = self.table_id.table_id,
1330                    "switch to new op consistency level"
1331                );
1332            }
1333            self.commit_inner(new_epoch, Some(op_consistency_level))
1334                .await
1335        } else {
1336            self.commit_inner(new_epoch, None).await
1337        }
1338    }
1339
1340    async fn commit_inner(
1341        &mut self,
1342        new_epoch: EpochPair,
1343        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1344    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1345    {
1346        assert!(!self.on_post_commit);
1347        assert_eq!(
1348            self.epoch.expect("should only be called after init").curr,
1349            new_epoch.prev
1350        );
1351        if let Some(new_consistency_level) = switch_consistent_op {
1352            assert_ne!(self.op_consistency_level, new_consistency_level);
1353            self.op_consistency_level = new_consistency_level;
1354        }
1355        trace!(
1356            table_id = %self.table_id,
1357            epoch = ?self.epoch,
1358            "commit state table"
1359        );
1360
1361        let table_watermarks = self.commit_pending_watermark();
1362        self.row_store
1363            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1364            .await?;
1365        self.epoch = Some(new_epoch);
1366
1367        // Refresh watermark cache if it is out of sync.
1368        if USE_WATERMARK_CACHE
1369            && !self.watermark_cache.is_synced()
1370            && let Some(ref watermark) = self.committed_watermark
1371        {
1372            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1373                (Included(once(Some(watermark.clone()))), Unbounded);
1374            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1375            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1376            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1377            // TODO(kwannoel): We can optimize it with:
1378            // 1. Either use `RefCell`.
1379            // 2. Or pass in a direct reference to LocalStateStore,
1380            //    instead of referencing it indirectly from `self`.
1381            //    Similar to how we do for pk_indices.
1382            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1383            {
1384                let mut streams = vec![];
1385                for vnode in self.vnodes().iter_vnodes() {
1386                    let stream = self
1387                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1388                        .await?;
1389                    streams.push(Box::pin(stream));
1390                }
1391                let merged_stream = merge_sort(streams);
1392                pin_mut!(merged_stream);
1393
1394                #[for_await]
1395                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1396                    let keyed_row = entry?;
1397                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1398                    // watermark column should be part of the pk
1399                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1400                        pks.push(pk);
1401                    }
1402                }
1403            }
1404
1405            let mut filler = self.watermark_cache.begin_syncing();
1406            for pk in pks {
1407                filler.insert_unchecked(DefaultOrdered(pk), ());
1408            }
1409            filler.finish();
1410
1411            let n_cache_entries = self.watermark_cache.len();
1412            if n_cache_entries < self.watermark_cache.capacity() {
1413                self.watermark_cache.set_table_row_count(n_cache_entries);
1414            }
1415        }
1416
1417        self.on_post_commit = true;
1418        Ok(StateTablePostCommit { inner: self })
1419    }
1420
1421    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1422    fn commit_pending_watermark(
1423        &mut self,
1424    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1425        let watermark = self.pending_watermark.take()?;
1426        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1427
1428        assert!(
1429            !self.pk_indices().is_empty(),
1430            "see pending watermark on empty pk"
1431        );
1432        let watermark_serializer = {
1433            match self.clean_watermark_index_in_pk {
1434                None => self.pk_serde.index(0),
1435                Some(clean_watermark_index_in_pk) => {
1436                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1437                }
1438            }
1439        };
1440
1441        let watermark_type = match self.clean_watermark_index_in_pk {
1442            None => WatermarkSerdeType::PkPrefix,
1443            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1444                0 => WatermarkSerdeType::PkPrefix,
1445                _ => WatermarkSerdeType::NonPkPrefix,
1446            },
1447        };
1448
1449        let should_clean_watermark = {
1450            {
1451                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1452                    if let Some(key) = self.watermark_cache.lowest_key() {
1453                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1454                    } else {
1455                        // Watermark cache is synced,
1456                        // And there's no key in watermark cache.
1457                        // That implies table is empty.
1458                        // We should not clean watermark.
1459                        false
1460                    }
1461                } else {
1462                    // Either we are not using watermark cache,
1463                    // Or watermark_cache is not synced.
1464                    // In either case we should clean watermark.
1465                    true
1466                }
1467            }
1468        };
1469
1470        let watermark_suffix =
1471            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1472
1473        // Compute Delete Ranges
1474        let seal_watermark = if should_clean_watermark {
1475            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1476                self.vnodes().iter_vnodes().collect_vec()
1477            }, "delete range");
1478
1479            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1480
1481            if order_type.is_ascending() {
1482                Some((
1483                    WatermarkDirection::Ascending,
1484                    VnodeWatermark::new(
1485                        self.vnodes().clone(),
1486                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1487                    ),
1488                    watermark_type,
1489                ))
1490            } else {
1491                Some((
1492                    WatermarkDirection::Descending,
1493                    VnodeWatermark::new(
1494                        self.vnodes().clone(),
1495                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1496                    ),
1497                    watermark_type,
1498                ))
1499            }
1500        } else {
1501            None
1502        };
1503        self.committed_watermark = Some(watermark);
1504
1505        // Clear the watermark cache and force a resync.
1506        // TODO(kwannoel): This can be further optimized:
1507        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1508        //    up to the largest end of delete ranges.
1509        // 2. Mark the cache as not_synced, so we can still refill it later.
1510        // 3. When refilling the cache,
1511        //    we just refill from the largest value of the cache, as the lower bound.
1512        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1513            self.watermark_cache.clear();
1514        }
1515
1516        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1517            (direction, vec![watermark], is_non_pk_prefix)
1518        })
1519    }
1520
1521    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1522        self.row_store.try_flush().await?;
1523        Ok(())
1524    }
1525}
1526
1527pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1528pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1529pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1530
1531pub trait FromVnodeBytes {
1532    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1533}
1534
1535impl FromVnodeBytes for Bytes {
1536    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1537        prefix_slice_with_vnode(vnode, bytes)
1538    }
1539}
1540
1541impl FromVnodeBytes for () {
1542    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1543}
1544
1545// Iterator functions
1546impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1547    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1548where
1549    S: StateStore,
1550    SD: ValueRowSerde,
1551{
1552    /// This function scans rows from the relational table with specific `pk_range` under the same
1553    /// `vnode`.
1554    pub async fn iter_with_vnode(
1555        &self,
1556
1557        // Optional vnode that returns an iterator only over the given range under that vnode.
1558        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1559        // iterate over each vnode that the `StateTableInner` owns.
1560        vnode: VirtualNode,
1561        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1562        prefetch_options: PrefetchOptions,
1563    ) -> StreamExecutorResult<impl RowStream<'_>> {
1564        Ok(self
1565            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1566            .await?
1567            .map_ok(|(_, row)| row))
1568    }
1569
1570    pub async fn iter_keyed_row_with_vnode(
1571        &self,
1572        vnode: VirtualNode,
1573        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1574        prefetch_options: PrefetchOptions,
1575    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1576        Ok(self
1577            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1578            .await?
1579            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1580    }
1581
1582    pub async fn iter_with_vnode_and_output_indices(
1583        &self,
1584        vnode: VirtualNode,
1585        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1586        prefetch_options: PrefetchOptions,
1587    ) -> StreamExecutorResult<impl RowStream<'_>> {
1588        assert!(IS_REPLICATED);
1589        let stream = self
1590            .iter_with_vnode(vnode, pk_range, prefetch_options)
1591            .await?;
1592        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1593    }
1594}
1595
1596impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1597    // The lowest-level API.
1598    /// Middle-level APIs:
1599    /// - [`StateTableInner::iter_with_prefix_inner`]
1600    /// - [`StateTableInner::iter_kv_with_pk_range`]
1601    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1602        &self,
1603        vnode: VirtualNode,
1604        (start, end): (Bound<Bytes>, Bound<Bytes>),
1605        prefix_hint: Option<Bytes>,
1606        prefetch_options: PrefetchOptions,
1607    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1608        if let Some(rows) = &self.all_rows {
1609            return Ok(futures::future::Either::Left(futures::stream::iter(
1610                rows.get(&vnode)
1611                    .expect("covered vnode")
1612                    .range((start, end))
1613                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1614            )));
1615        }
1616        let read_options = ReadOptions {
1617            prefix_hint,
1618            retention_seconds: self.table_option.retention_seconds,
1619            prefetch_options,
1620            cache_policy: CachePolicy::Fill(Hint::Normal),
1621        };
1622
1623        Ok(futures::future::Either::Right(
1624            deserialize_keyed_row_stream(
1625                self.state_store
1626                    .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1627                    .await?,
1628                &*self.row_serde,
1629            ),
1630        ))
1631    }
1632
1633    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1634        &self,
1635        vnode: VirtualNode,
1636        (start, end): (Bound<Bytes>, Bound<Bytes>),
1637        prefix_hint: Option<Bytes>,
1638        prefetch_options: PrefetchOptions,
1639    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1640        if let Some(rows) = &self.all_rows {
1641            return Ok(futures::future::Either::Left(futures::stream::iter(
1642                rows.get(&vnode)
1643                    .expect("covered vnode")
1644                    .range((start, end))
1645                    .rev()
1646                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1647            )));
1648        }
1649        let read_options = ReadOptions {
1650            prefix_hint,
1651            retention_seconds: self.table_option.retention_seconds,
1652            prefetch_options,
1653            cache_policy: CachePolicy::Fill(Hint::Normal),
1654        };
1655
1656        Ok(futures::future::Either::Right(
1657            deserialize_keyed_row_stream(
1658                self.state_store
1659                    .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1660                    .await?,
1661                &*self.row_serde,
1662            ),
1663        ))
1664    }
1665}
1666
1667impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1668    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1669where
1670    S: StateStore,
1671    SD: ValueRowSerde,
1672{
1673    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1674    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1675    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1676    pub async fn iter_with_prefix(
1677        &self,
1678        pk_prefix: impl Row,
1679        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1680        prefetch_options: PrefetchOptions,
1681    ) -> StreamExecutorResult<impl RowStream<'_>> {
1682        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1683            .await?;
1684        Ok(stream.map_ok(|(_, row)| row))
1685    }
1686
1687    /// Get the row from a state table with only 1 row.
1688    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1689        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1690        let stream = self
1691            .iter_with_prefix(row::empty(), sub_range, Default::default())
1692            .await?;
1693        pin_mut!(stream);
1694
1695        if let Some(res) = stream.next().await {
1696            let value = res?.into_owned_row();
1697            assert!(stream.next().await.is_none());
1698            Ok(Some(value))
1699        } else {
1700            Ok(None)
1701        }
1702    }
1703
1704    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1705    ///
1706    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1707    /// which does not matter in the use case.
1708    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1709        Ok(self
1710            .get_from_one_row_table()
1711            .await?
1712            .and_then(|row| row[0].clone()))
1713    }
1714
1715    pub async fn iter_keyed_row_with_prefix(
1716        &self,
1717        pk_prefix: impl Row,
1718        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1719        prefetch_options: PrefetchOptions,
1720    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1721        Ok(
1722            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1723                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1724        )
1725    }
1726
1727    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1728    pub async fn rev_iter_with_prefix(
1729        &self,
1730        pk_prefix: impl Row,
1731        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1732        prefetch_options: PrefetchOptions,
1733    ) -> StreamExecutorResult<impl RowStream<'_>> {
1734        Ok(
1735            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1736                .await?.map_ok(|(_, row)| row),
1737        )
1738    }
1739
1740    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1741        &self,
1742        pk_prefix: impl Row,
1743        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1744        prefetch_options: PrefetchOptions,
1745    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1746        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1747        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1748
1749        // We assume that all usages of iterating the state table only access a single vnode.
1750        // If this assertion fails, then something must be wrong with the operator implementation or
1751        // the distribution derivation from the optimizer.
1752        let vnode = self.compute_prefix_vnode(&pk_prefix);
1753
1754        // Construct prefix hint for prefix bloom filter.
1755        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1756        if self.prefix_hint_len != 0 {
1757            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1758        }
1759        let prefix_hint = {
1760            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1761                None
1762            } else {
1763                let encoded_prefix_len = self
1764                    .pk_serde
1765                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1766
1767                Some(Bytes::copy_from_slice(
1768                    &encoded_prefix[..encoded_prefix_len],
1769                ))
1770            }
1771        };
1772
1773        trace!(
1774            table_id = %self.table_id(),
1775            ?prefix_hint, ?pk_prefix,
1776            ?pk_prefix_indices,
1777            iter_direction = if REVERSE { "reverse" } else { "forward" },
1778            "storage_iter_with_prefix"
1779        );
1780
1781        let memcomparable_range =
1782            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1783
1784        Ok(if REVERSE {
1785            futures::future::Either::Left(
1786                self.row_store
1787                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1788                    .await?,
1789            )
1790        } else {
1791            futures::future::Either::Right(
1792                self.row_store
1793                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1794                    .await?,
1795            )
1796        })
1797    }
1798
1799    /// This function scans raw key-values from the relational table with specific `pk_range` under
1800    /// the same `vnode`.
1801    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1802        &'a self,
1803        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1804        // Optional vnode that returns an iterator only over the given range under that vnode.
1805        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1806        // iterate over each vnode that the `StateTableInner` owns.
1807        vnode: VirtualNode,
1808        prefetch_options: PrefetchOptions,
1809    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1810        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1811
1812        // TODO: provide a trace of useful params.
1813        self.row_store
1814            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1815            .await
1816    }
1817
1818    #[cfg(test)]
1819    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1820        &self.watermark_cache
1821    }
1822}
1823
1824fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1825    iter: impl StateStoreIter + 'a,
1826    deserializer: &'a impl ValueRowSerde,
1827) -> impl PkRowStream<'a, K> {
1828    iter.into_stream(move |(key, value)| {
1829        Ok((
1830            K::copy_from_slice(key.user_key.table_key.as_ref()),
1831            deserializer.deserialize(value).map(OwnedRow::new)?,
1832        ))
1833    })
1834    .map_err(Into::into)
1835}
1836
1837pub fn prefix_range_to_memcomparable(
1838    pk_serde: &OrderedRowSerde,
1839    range: &(Bound<impl Row>, Bound<impl Row>),
1840) -> (Bound<Bytes>, Bound<Bytes>) {
1841    (
1842        start_range_to_memcomparable(pk_serde, &range.0),
1843        end_range_to_memcomparable(pk_serde, &range.1, None),
1844    )
1845}
1846
1847fn prefix_and_sub_range_to_memcomparable(
1848    pk_serde: &OrderedRowSerde,
1849    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1850    pk_prefix: impl Row,
1851) -> (Bound<Bytes>, Bound<Bytes>) {
1852    let (range_start, range_end) = sub_range;
1853    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1854    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1855    let start_range = match range_start {
1856        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1857        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1858        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1859    };
1860    let end_range = match range_end {
1861        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1862        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1863        Unbounded => Unbounded,
1864    };
1865    (
1866        start_range_to_memcomparable(pk_serde, &start_range),
1867        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1868    )
1869}
1870
1871fn start_range_to_memcomparable<R: Row>(
1872    pk_serde: &OrderedRowSerde,
1873    bound: &Bound<R>,
1874) -> Bound<Bytes> {
1875    let serialize_pk_prefix = |pk_prefix: &R| {
1876        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1877        serialize_pk(pk_prefix, &prefix_serializer)
1878    };
1879    match bound {
1880        Unbounded => Unbounded,
1881        Included(r) => {
1882            let serialized = serialize_pk_prefix(r);
1883
1884            Included(serialized)
1885        }
1886        Excluded(r) => {
1887            let serialized = serialize_pk_prefix(r);
1888
1889            start_bound_of_excluded_prefix(&serialized)
1890        }
1891    }
1892}
1893
1894fn end_range_to_memcomparable<R: Row>(
1895    pk_serde: &OrderedRowSerde,
1896    bound: &Bound<R>,
1897    serialized_pk_prefix: Option<Bytes>,
1898) -> Bound<Bytes> {
1899    let serialize_pk_prefix = |pk_prefix: &R| {
1900        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1901        serialize_pk(pk_prefix, &prefix_serializer)
1902    };
1903    match bound {
1904        Unbounded => match serialized_pk_prefix {
1905            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1906            None => Unbounded,
1907        },
1908        Included(r) => {
1909            let serialized = serialize_pk_prefix(r);
1910
1911            end_bound_of_prefix(&serialized)
1912        }
1913        Excluded(r) => {
1914            let serialized = serialize_pk_prefix(r);
1915            Excluded(serialized)
1916        }
1917    }
1918}
1919
1920fn fill_non_output_indices(
1921    i2o_mapping: &ColIndexMapping,
1922    data_types: &[DataType],
1923    chunk: StreamChunk,
1924) -> StreamChunk {
1925    let cardinality = chunk.cardinality();
1926    let (ops, columns, vis) = chunk.into_inner();
1927    let mut full_columns = Vec::with_capacity(data_types.len());
1928    for (i, data_type) in data_types.iter().enumerate() {
1929        if let Some(j) = i2o_mapping.try_map(i) {
1930            full_columns.push(columns[j].clone());
1931        } else {
1932            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1933            column_builder.append_n_null(cardinality);
1934            let column: ArrayRef = column_builder.finish().into();
1935            full_columns.push(column)
1936        }
1937    }
1938    let data_chunk = DataChunk::new(full_columns, vis);
1939    StreamChunk::from_parts(ops, data_chunk)
1940}
1941
1942#[cfg(test)]
1943mod tests {
1944    use std::fmt::Debug;
1945
1946    use expect_test::{Expect, expect};
1947
1948    use super::*;
1949
1950    fn check(actual: impl Debug, expect: Expect) {
1951        let actual = format!("{:#?}", actual);
1952        expect.assert_eq(&actual);
1953    }
1954
1955    #[test]
1956    fn test_fill_non_output_indices() {
1957        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1958        let replicated_chunk = [OwnedRow::new(vec![
1959            Some(222_i32.into()),
1960            Some(2_i32.into()),
1961        ])];
1962        let replicated_chunk = StreamChunk::from_parts(
1963            vec![Op::Insert],
1964            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1965        );
1966        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1967        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1968        check(
1969            filled_chunk,
1970            expect![[r#"
1971            StreamChunk { cardinality: 1, capacity: 1, data:
1972            +---+---+---+-----+
1973            | + | 2 |   | 222 |
1974            +---+---+---+-----+
1975             }"#]],
1976        );
1977    }
1978}