risingwave_stream/common/table/
state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::ops::Bound;
18use std::ops::Bound::*;
19use std::sync::Arc;
20use std::time::Instant;
21
22use bytes::Bytes;
23use either::Either;
24use foyer::Hint;
25use futures::future::try_join_all;
26use futures::{Stream, StreamExt, TryStreamExt, pin_mut};
27use futures_async_stream::for_await;
28use itertools::Itertools;
29use risingwave_common::array::stream_record::Record;
30use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
31use risingwave_common::bitmap::Bitmap;
32use risingwave_common::catalog::{
33    ColumnDesc, ColumnId, TableId, TableOption, get_dist_key_in_pk_indices,
34};
35use risingwave_common::config::StreamingConfig;
36use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
37use risingwave_common::row::{self, Once, OwnedRow, Row, RowExt, once};
38use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
39use risingwave_common::util::column_index_mapping::ColIndexMapping;
40use risingwave_common::util::epoch::EpochPair;
41use risingwave_common::util::row_serde::OrderedRowSerde;
42use risingwave_common::util::sort_util::OrderType;
43use risingwave_common::util::value_encoding::BasicSerde;
44use risingwave_hummock_sdk::HummockReadEpoch;
45use risingwave_hummock_sdk::key::{
46    CopyFromSlice, TableKey, end_bound_of_prefix, next_key, prefix_slice_with_vnode,
47    prefixed_range_with_vnode, start_bound_of_excluded_prefix,
48};
49use risingwave_hummock_sdk::table_watermark::{
50    VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
51};
52use risingwave_pb::catalog::Table;
53use risingwave_storage::StateStore;
54use risingwave_storage::error::{ErrorKind, StorageError, StorageResult};
55use risingwave_storage::hummock::CachePolicy;
56use risingwave_storage::mem_table::MemTableError;
57use risingwave_storage::row_serde::find_columns_by_ids;
58use risingwave_storage::row_serde::row_serde_util::{
59    deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
60};
61use risingwave_storage::row_serde::value_serde::ValueRowSerde;
62use risingwave_storage::store::*;
63use risingwave_storage::table::merge_sort::merge_sort;
64use risingwave_storage::table::{KeyedRow, TableDistribution};
65use thiserror_ext::AsReport;
66use tracing::{Instrument, trace};
67
68use crate::cache::cache_may_stale;
69use crate::common::state_cache::{StateCache, StateCacheFiller};
70use crate::common::table::state_table_cache::StateTableWatermarkCache;
71use crate::executor::StreamExecutorResult;
72
73/// Mostly watermark operators will have inserts (append-only).
74/// So this number should not need to be very large.
75/// But we may want to improve this choice in the future.
76const WATERMARK_CACHE_ENTRIES: usize = 16;
77
78/// This macro is used to mark a point where we want to randomly discard the operation and early
79/// return, only in insane mode.
80macro_rules! insane_mode_discard_point {
81    () => {{
82        use rand::Rng;
83        if crate::consistency::insane() && rand::rng().random_bool(0.3) {
84            return;
85        }
86    }};
87}
88
89/// `StateTableInner` is the interface accessing relational data in KV(`StateStore`) with
90/// row-based encoding.
91pub struct StateTableInner<
92    S,
93    SD = BasicSerde,
94    const IS_REPLICATED: bool = false,
95    const USE_WATERMARK_CACHE: bool = false,
96> where
97    S: StateStore,
98    SD: ValueRowSerde,
99{
100    /// Id for this table.
101    table_id: TableId,
102
103    /// State store backend.
104    row_store: StateTableRowStore<S::Local, SD>,
105
106    /// State store for accessing snapshot data
107    store: S,
108
109    /// Current epoch
110    epoch: Option<EpochPair>,
111
112    /// Used for serializing and deserializing the primary key.
113    pk_serde: OrderedRowSerde,
114
115    /// Indices of primary key.
116    /// Note that the index is based on the all columns of the table, instead of the output ones.
117    // FIXME: revisit constructions and usages.
118    pk_indices: Vec<usize>,
119
120    /// Distribution of the state table.
121    ///
122    /// It holds vnode bitmap. Only the rows whose vnode of the primary key is in this set will be visible to the
123    /// executor. The table will also check whether the written rows
124    /// conform to this partition.
125    distribution: TableDistribution,
126
127    prefix_hint_len: usize,
128
129    value_indices: Option<Vec<usize>>,
130
131    /// Pending watermark for state cleaning. Old states below this watermark will be cleaned when committing.
132    pending_watermark: Option<ScalarImpl>,
133    /// Last committed watermark for state cleaning. Will be restored on state table recovery.
134    committed_watermark: Option<ScalarImpl>,
135    /// Cache for the top-N primary keys for reducing unnecessary range deletion.
136    watermark_cache: StateTableWatermarkCache,
137
138    /// Data Types
139    /// We will need to use to build data chunks from state table rows.
140    data_types: Vec<DataType>,
141
142    /// "i" here refers to the base `state_table`'s actual schema.
143    /// "o" here refers to the replicated state table's output schema.
144    /// This mapping is used to reconstruct a row being written from replicated state table.
145    /// Such that the schema of this row will match the full schema of the base state table.
146    /// It is only applicable for replication.
147    i2o_mapping: ColIndexMapping,
148
149    /// Output indices
150    /// Used for:
151    /// 1. Computing `output_value_indices` to ser/de replicated rows.
152    /// 2. Computing output pk indices to used them for backfill state.
153    output_indices: Vec<usize>,
154
155    op_consistency_level: StateTableOpConsistencyLevel,
156
157    clean_watermark_index_in_pk: Option<i32>,
158
159    /// Flag to indicate whether the state table has called `commit`, but has not called
160    /// `post_yield_barrier` on the `StateTablePostCommit` callback yet.
161    on_post_commit: bool,
162}
163
164/// `StateTable` will use `BasicSerde` as default
165pub type StateTable<S> = StateTableInner<S, BasicSerde>;
166/// `ReplicatedStateTable` is meant to replicate upstream shared buffer.
167/// Used for `ArrangementBackfill` executor.
168pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
169/// `WatermarkCacheStateTable` caches the watermark column.
170/// It will reduce state cleaning overhead.
171pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
172pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
173    StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
174
175// initialize
176impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
177    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
178where
179    S: StateStore,
180    SD: ValueRowSerde,
181{
182    /// In streaming executors, this methods must be called **after** receiving and yielding the first barrier,
183    /// and otherwise, deadlock can be likely to happen.
184    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
185        self.row_store
186            .init(epoch, self.distribution.vnodes())
187            .await?;
188        assert_eq!(None, self.epoch.replace(epoch), "should not init for twice");
189        Ok(())
190    }
191
192    pub async fn try_wait_committed_epoch(&self, prev_epoch: u64) -> StorageResult<()> {
193        self.store
194            .try_wait_epoch(
195                HummockReadEpoch::Committed(prev_epoch),
196                TryWaitEpochOptions {
197                    table_id: self.table_id,
198                },
199            )
200            .await
201    }
202
203    pub fn state_store(&self) -> &S {
204        &self.store
205    }
206}
207
208fn consistent_old_value_op(
209    row_serde: Arc<impl ValueRowSerde>,
210    is_log_store: bool,
211) -> OpConsistencyLevel {
212    OpConsistencyLevel::ConsistentOldValue {
213        check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
214            if first == second {
215                return true;
216            }
217            let first = match row_serde.deserialize(first) {
218                Ok(rows) => rows,
219                Err(e) => {
220                    error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
221                    return false;
222                }
223            };
224            let second = match row_serde.deserialize(second) {
225                Ok(rows) => rows,
226                Err(e) => {
227                    error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
228                    return false;
229                }
230            };
231            if first != second {
232                error!(first = ?first, second = ?second, "sanity check fail");
233                false
234            } else {
235                true
236            }
237        }),
238        is_log_store,
239    }
240}
241
242macro_rules! dispatch_value_indices {
243    ($value_indices:expr, [$($row_var_name:ident),+], $body:expr) => {
244        if let Some(value_indices) = $value_indices {
245            $(
246                let $row_var_name = $row_var_name.project(value_indices);
247            )+
248            $body
249        } else {
250            $body
251        }
252    };
253}
254
255/// Extract the logic of `StateTableRowStore` from `StateTable`, which serves
256/// a similar functionality as a `BTreeMap<TableKey<Bytes>, OwnedRow>`, and
257/// provides method to read (`get` and `iter`) over serialized key (or key range)
258/// and returns `OwnedRow`, and write (`insert`, `delete`, `update`) on `(TableKey<Bytes>, OwnedRow)`.
259struct StateTableRowStore<LS: LocalStateStore, SD: ValueRowSerde> {
260    state_store: LS,
261    all_rows: Option<HashMap<VirtualNode, BTreeMap<Bytes, OwnedRow>>>,
262
263    table_id: TableId,
264    table_option: TableOption,
265    row_serde: Arc<SD>,
266    // should be only used for debugging in panic message of handle_mem_table_error
267    pk_serde: OrderedRowSerde,
268}
269
270impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
271    async fn may_reload_all_rows(&mut self, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
272        if let Some(rows) = &mut self.all_rows {
273            rows.clear();
274            let start_time = Instant::now();
275            *rows = try_join_all(vnode_bitmap.iter_vnodes().map(|vnode| {
276                let state_store = &self.state_store;
277                let retention_seconds = self.table_option.retention_seconds;
278                let row_serde = &self.row_serde;
279                async move {
280                    let mut rows = BTreeMap::new();
281                    let memcomparable_range_with_vnode =
282                        prefixed_range_with_vnode::<Bytes>(.., vnode);
283                    // TODO: set read options
284                    let stream = deserialize_keyed_row_stream::<Bytes>(
285                        state_store
286                            .iter(
287                                memcomparable_range_with_vnode,
288                                ReadOptions {
289                                    prefix_hint: None,
290                                    prefetch_options: Default::default(),
291                                    cache_policy: Default::default(),
292                                    retention_seconds,
293                                },
294                            )
295                            .await?,
296                        &**row_serde,
297                    );
298                    pin_mut!(stream);
299                    while let Some((encoded_key, row)) = stream.try_next().await? {
300                        let key = TableKey(encoded_key);
301                        let (iter_vnode, key) = key.split_vnode_bytes();
302                        assert_eq!(vnode, iter_vnode);
303                        rows.try_insert(key, row).expect("non-duplicated");
304                    }
305                    Ok((vnode, rows)) as StreamExecutorResult<_>
306                }
307            }))
308            .await?
309            .into_iter()
310            .collect();
311            // avoid flooding e2e-test log
312            if !cfg!(debug_assertions) {
313                info!(table_id = %self.table_id, vnode_count = vnode_bitmap.count_ones(), duration = ?start_time.elapsed(),"finished reloading all rows");
314            }
315        }
316        Ok(())
317    }
318
319    async fn init(&mut self, epoch: EpochPair, vnode_bitmap: &Bitmap) -> StreamExecutorResult<()> {
320        self.state_store.init(InitOptions::new(epoch)).await?;
321        self.may_reload_all_rows(vnode_bitmap).await
322    }
323
324    async fn update_vnode_bitmap(
325        &mut self,
326        vnodes: Arc<Bitmap>,
327    ) -> StreamExecutorResult<Arc<Bitmap>> {
328        let prev_vnodes = self.state_store.update_vnode_bitmap(vnodes.clone()).await?;
329        self.may_reload_all_rows(&vnodes).await?;
330        Ok(prev_vnodes)
331    }
332
333    async fn try_flush(&mut self) -> StreamExecutorResult<()> {
334        self.state_store.try_flush().await?;
335        Ok(())
336    }
337
338    async fn seal_current_epoch(
339        &mut self,
340        next_epoch: u64,
341        table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)>,
342        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
343    ) -> StreamExecutorResult<()> {
344        if let Some((direction, watermarks, serde_type)) = &table_watermarks
345            && let Some(rows) = &mut self.all_rows
346        {
347            match serde_type {
348                WatermarkSerdeType::PkPrefix => {
349                    for vnode_watermark in watermarks {
350                        match direction {
351                            WatermarkDirection::Ascending => {
352                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
353                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
354                                    // split_off returns everything after the given key, including the key.
355                                    *rows = rows.split_off(vnode_watermark.watermark());
356                                }
357                            }
358                            WatermarkDirection::Descending => {
359                                // Turn Exclude(vnode_watermark.watermark()) into Include(next_key(vnode_watermark.watermark())).
360                                let split_off_key = next_key(vnode_watermark.watermark());
361                                for vnode in vnode_watermark.vnode_bitmap().iter_vnodes() {
362                                    let rows = rows.get_mut(&vnode).expect("covered vnode");
363                                    // split_off away (Exclude(vnode_watermark.watermark())..) and keep
364                                    // (..Include(vnode_watermark.watermark()))
365                                    rows.split_off(split_off_key.as_slice());
366                                }
367                            }
368                        }
369                    }
370                }
371                WatermarkSerdeType::NonPkPrefix => {
372                    warn!(table_id = %self.table_id, "table enabled preloading rows got disabled by written non pk prefix watermark");
373                    self.all_rows = None;
374                }
375            }
376        }
377        self.state_store
378            .flush()
379            .instrument(tracing::info_span!("state_table_flush"))
380            .await?;
381        let switch_op_consistency_level =
382            switch_consistent_op.map(|new_consistency_level| match new_consistency_level {
383                StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
384                StateTableOpConsistencyLevel::ConsistentOldValue => {
385                    consistent_old_value_op(self.row_serde.clone(), false)
386                }
387                StateTableOpConsistencyLevel::LogStoreEnabled => {
388                    consistent_old_value_op(self.row_serde.clone(), true)
389                }
390            });
391        self.state_store.seal_current_epoch(
392            next_epoch,
393            SealCurrentEpochOptions {
394                table_watermarks,
395                switch_op_consistency_level,
396            },
397        );
398        Ok(())
399    }
400}
401
402#[derive(Eq, PartialEq, Copy, Clone, Debug)]
403pub enum StateTableOpConsistencyLevel {
404    /// Op is inconsistent
405    Inconsistent,
406    /// Op is consistent.
407    /// - Insert op should ensure that the key does not exist previously
408    /// - Delete and Update op should ensure that the key exists and the previous value matches the passed old value
409    ConsistentOldValue,
410    /// The requirement on operation consistency is the same as `ConsistentOldValue`.
411    /// The difference is that in the `LogStoreEnabled`, the state table should also flush and store and old value.
412    LogStoreEnabled,
413}
414
415pub struct StateTableBuilder<
416    'a,
417    S,
418    SD,
419    const IS_REPLICATED: bool,
420    const USE_WATERMARK_CACHE: bool,
421    PreloadAllRow,
422> {
423    table_catalog: &'a Table,
424    store: S,
425    vnodes: Option<Arc<Bitmap>>,
426    op_consistency_level: Option<StateTableOpConsistencyLevel>,
427    output_column_ids: Option<Vec<ColumnId>>,
428    preload_all_rows: PreloadAllRow,
429
430    _serde: PhantomData<SD>,
431}
432
433impl<
434    'a,
435    S: StateStore,
436    SD: ValueRowSerde,
437    const IS_REPLICATED: bool,
438    const USE_WATERMARK_CACHE: bool,
439> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, ()>
440{
441    pub fn new(table_catalog: &'a Table, store: S, vnodes: Option<Arc<Bitmap>>) -> Self {
442        Self {
443            table_catalog,
444            store,
445            vnodes,
446            op_consistency_level: None,
447            output_column_ids: None,
448            preload_all_rows: (),
449            _serde: Default::default(),
450        }
451    }
452
453    fn with_preload_all_rows(
454        self,
455        preload_all_rows: bool,
456    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
457        StateTableBuilder {
458            table_catalog: self.table_catalog,
459            store: self.store,
460            vnodes: self.vnodes,
461            op_consistency_level: self.op_consistency_level,
462            output_column_ids: self.output_column_ids,
463            preload_all_rows,
464            _serde: Default::default(),
465        }
466    }
467
468    pub fn enable_preload_all_rows_by_config(
469        self,
470        config: &StreamingConfig,
471    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
472        let developer = &config.developer;
473        let preload_all_rows = if developer.default_enable_mem_preload_state_table {
474            !developer
475                .mem_preload_state_table_ids_blacklist
476                .contains(&self.table_catalog.id)
477        } else {
478            developer
479                .mem_preload_state_table_ids_whitelist
480                .contains(&self.table_catalog.id)
481        };
482        self.with_preload_all_rows(preload_all_rows)
483    }
484
485    pub fn forbid_preload_all_rows(
486        self,
487    ) -> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool> {
488        self.with_preload_all_rows(false)
489    }
490}
491
492impl<
493    'a,
494    S: StateStore,
495    SD: ValueRowSerde,
496    const IS_REPLICATED: bool,
497    const USE_WATERMARK_CACHE: bool,
498    PreloadAllRow,
499> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, PreloadAllRow>
500{
501    pub fn with_op_consistency_level(
502        mut self,
503        op_consistency_level: StateTableOpConsistencyLevel,
504    ) -> Self {
505        self.op_consistency_level = Some(op_consistency_level);
506        self
507    }
508
509    pub fn with_output_column_ids(mut self, output_column_ids: Vec<ColumnId>) -> Self {
510        self.output_column_ids = Some(output_column_ids);
511        self
512    }
513}
514
515impl<
516    'a,
517    S: StateStore,
518    SD: ValueRowSerde,
519    const IS_REPLICATED: bool,
520    const USE_WATERMARK_CACHE: bool,
521> StateTableBuilder<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE, bool>
522{
523    pub async fn build(self) -> StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
524        let mut preload_all_rows = self.preload_all_rows;
525        if preload_all_rows
526            && let Err(e) =
527                risingwave_common::license::Feature::StateTableMemoryPreload.check_available()
528        {
529            warn!(table_id=self.table_catalog.id, e=%e.as_report(), "table configured to preload rows to memory but disabled by license");
530            preload_all_rows = false;
531        }
532        StateTableInner::from_table_catalog_inner(
533            self.table_catalog,
534            self.store,
535            self.vnodes,
536            self.op_consistency_level
537                .unwrap_or(StateTableOpConsistencyLevel::ConsistentOldValue),
538            self.output_column_ids.unwrap_or_default(),
539            preload_all_rows,
540        )
541        .await
542    }
543}
544
545// initialize
546// FIXME(kwannoel): Enforce that none of the constructors here
547// should be used by replicated state table.
548// Apart from from_table_catalog_inner.
549impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
550    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
551where
552    S: StateStore,
553    SD: ValueRowSerde,
554{
555    /// Create state table from table catalog and store.
556    ///
557    /// If `vnodes` is `None`, [`TableDistribution::singleton()`] will be used.
558    #[cfg(any(test, feature = "test"))]
559    pub async fn from_table_catalog(
560        table_catalog: &Table,
561        store: S,
562        vnodes: Option<Arc<Bitmap>>,
563    ) -> Self {
564        StateTableBuilder::new(table_catalog, store, vnodes)
565            .forbid_preload_all_rows()
566            .build()
567            .await
568    }
569
570    /// Create state table from table catalog and store with sanity check disabled.
571    #[cfg(any(test, feature = "test"))]
572    pub async fn from_table_catalog_inconsistent_op(
573        table_catalog: &Table,
574        store: S,
575        vnodes: Option<Arc<Bitmap>>,
576    ) -> Self {
577        StateTableBuilder::new(table_catalog, store, vnodes)
578            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
579            .forbid_preload_all_rows()
580            .build()
581            .await
582    }
583
584    /// Create state table from table catalog and store.
585    async fn from_table_catalog_inner(
586        table_catalog: &Table,
587        store: S,
588        vnodes: Option<Arc<Bitmap>>,
589        op_consistency_level: StateTableOpConsistencyLevel,
590        output_column_ids: Vec<ColumnId>,
591        preload_all_rows: bool,
592    ) -> Self {
593        let table_id = TableId::new(table_catalog.id);
594        let table_columns: Vec<ColumnDesc> = table_catalog
595            .columns
596            .iter()
597            .map(|col| col.column_desc.as_ref().unwrap().into())
598            .collect();
599        let data_types: Vec<DataType> = table_catalog
600            .columns
601            .iter()
602            .map(|col| {
603                col.get_column_desc()
604                    .unwrap()
605                    .get_column_type()
606                    .unwrap()
607                    .into()
608            })
609            .collect();
610        let order_types: Vec<OrderType> = table_catalog
611            .pk
612            .iter()
613            .map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
614            .collect();
615        let dist_key_indices: Vec<usize> = table_catalog
616            .distribution_key
617            .iter()
618            .map(|dist_index| *dist_index as usize)
619            .collect();
620
621        let pk_indices = table_catalog
622            .pk
623            .iter()
624            .map(|col_order| col_order.column_index as usize)
625            .collect_vec();
626
627        // FIXME(yuhao): only use `dist_key_in_pk` in the proto
628        let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
629            get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
630        } else {
631            table_catalog
632                .get_dist_key_in_pk()
633                .iter()
634                .map(|idx| *idx as usize)
635                .collect()
636        };
637
638        let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
639            let vnode_col_idx = *idx as usize;
640            pk_indices.iter().position(|&i| vnode_col_idx == i)
641        });
642
643        let distribution =
644            TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
645        assert_eq!(
646            distribution.vnode_count(),
647            table_catalog.vnode_count(),
648            "vnode count mismatch, scanning table {} under wrong distribution?",
649            table_catalog.name,
650        );
651
652        let pk_data_types = pk_indices
653            .iter()
654            .map(|i| table_columns[*i].data_type.clone())
655            .collect();
656        let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
657
658        let input_value_indices = table_catalog
659            .value_indices
660            .iter()
661            .map(|val| *val as usize)
662            .collect_vec();
663
664        let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
665
666        // if value_indices is the no shuffle full columns.
667        let value_indices = match input_value_indices.len() == table_columns.len()
668            && input_value_indices == no_shuffle_value_indices
669        {
670            true => None,
671            false => Some(input_value_indices),
672        };
673        let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
674
675        let row_serde = Arc::new(SD::new(
676            Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
677            Arc::from(table_columns.clone().into_boxed_slice()),
678        ));
679
680        let state_table_op_consistency_level = op_consistency_level;
681        let op_consistency_level = match op_consistency_level {
682            StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
683            StateTableOpConsistencyLevel::ConsistentOldValue => {
684                consistent_old_value_op(row_serde.clone(), false)
685            }
686            StateTableOpConsistencyLevel::LogStoreEnabled => {
687                consistent_old_value_op(row_serde.clone(), true)
688            }
689        };
690
691        let table_option = TableOption::new(table_catalog.retention_seconds);
692        let new_local_options = if IS_REPLICATED {
693            NewLocalOptions::new_replicated(
694                table_id,
695                op_consistency_level,
696                table_option,
697                distribution.vnodes().clone(),
698            )
699        } else {
700            NewLocalOptions::new(
701                table_id,
702                op_consistency_level,
703                table_option,
704                distribution.vnodes().clone(),
705                true,
706            )
707        };
708        let local_state_store = store.new_local(new_local_options).await;
709
710        // If state table has versioning, that means it supports
711        // Schema change. In that case, the row encoding should be column aware as well.
712        // Otherwise both will be false.
713        // NOTE(kwannoel): Replicated table will follow upstream table's versioning. I'm not sure
714        // If ALTER TABLE will propagate to this replicated table as well. Ideally it won't
715        assert_eq!(
716            table_catalog.version.is_some(),
717            row_serde.kind().is_column_aware()
718        );
719
720        // Restore persisted table watermark.
721        let watermark_serde = if pk_indices.is_empty() {
722            None
723        } else {
724            match table_catalog.clean_watermark_index_in_pk {
725                None => Some(pk_serde.index(0)),
726                Some(clean_watermark_index_in_pk) => {
727                    Some(pk_serde.index(clean_watermark_index_in_pk as usize))
728                }
729            }
730        };
731        let max_watermark_of_vnodes = distribution
732            .vnodes()
733            .iter_vnodes()
734            .filter_map(|vnode| local_state_store.get_table_watermark(vnode))
735            .max();
736        let committed_watermark = if let Some(deser) = watermark_serde
737            && let Some(max_watermark) = max_watermark_of_vnodes
738        {
739            let deserialized = deser.deserialize(&max_watermark).ok().and_then(|row| {
740                assert!(row.len() == 1);
741                row[0].clone()
742            });
743            if deserialized.is_none() {
744                tracing::error!(
745                    vnodes = ?distribution.vnodes(),
746                    watermark = ?max_watermark,
747                    "Failed to deserialize persisted watermark from state store.",
748                );
749            }
750            deserialized
751        } else {
752            None
753        };
754
755        let watermark_cache = if USE_WATERMARK_CACHE {
756            StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
757        } else {
758            StateTableWatermarkCache::new(0)
759        };
760
761        // Get info for replicated state table.
762        let output_column_ids_to_input_idx = output_column_ids
763            .iter()
764            .enumerate()
765            .map(|(pos, id)| (*id, pos))
766            .collect::<HashMap<_, _>>();
767
768        // Compute column descriptions
769        let columns: Vec<ColumnDesc> = table_catalog
770            .columns
771            .iter()
772            .map(|c| c.column_desc.as_ref().unwrap().into())
773            .collect_vec();
774
775        // Compute i2o mapping
776        // Note that this can be a partial mapping, since we use the i2o mapping to get
777        // any 1 of the output columns, and use that to fill the input column.
778        let mut i2o_mapping = vec![None; columns.len()];
779        for (i, column) in columns.iter().enumerate() {
780            if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
781                i2o_mapping[i] = Some(*pos);
782            }
783        }
784        // We can prune any duplicate column indices
785        let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
786
787        // Compute output indices
788        let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
789
790        Self {
791            table_id,
792            row_store: StateTableRowStore {
793                all_rows: preload_all_rows.then(HashMap::new),
794                table_option,
795                state_store: local_state_store,
796                row_serde,
797                pk_serde: pk_serde.clone(),
798                table_id,
799            },
800            store,
801            epoch: None,
802            pk_serde,
803            pk_indices,
804            distribution,
805            prefix_hint_len,
806            value_indices,
807            pending_watermark: None,
808            committed_watermark,
809            watermark_cache,
810            data_types,
811            output_indices,
812            i2o_mapping,
813            op_consistency_level: state_table_op_consistency_level,
814            clean_watermark_index_in_pk: table_catalog.clean_watermark_index_in_pk,
815            on_post_commit: false,
816        }
817    }
818
819    pub fn get_data_types(&self) -> &[DataType] {
820        &self.data_types
821    }
822
823    pub fn table_id(&self) -> u32 {
824        self.table_id.table_id
825    }
826
827    /// Get the vnode value with given (prefix of) primary key
828    fn compute_prefix_vnode(&self, pk_prefix: &impl Row) -> VirtualNode {
829        self.distribution
830            .try_compute_vnode_by_pk_prefix(pk_prefix)
831            .expect("For streaming, the given prefix must be enough to calculate the vnode")
832    }
833
834    /// Get the vnode value of the given primary key
835    pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
836        self.distribution.compute_vnode_by_pk(pk)
837    }
838
839    /// NOTE(kwannoel): This is used by backfill.
840    /// We want to check pk indices of upstream table.
841    pub fn pk_indices(&self) -> &[usize] {
842        &self.pk_indices
843    }
844
845    /// Get the indices of the primary key columns in the output columns.
846    ///
847    /// Returns `None` if any of the primary key columns is not in the output columns.
848    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
849        assert!(IS_REPLICATED);
850        self.pk_indices
851            .iter()
852            .map(|&i| self.output_indices.iter().position(|&j| i == j))
853            .collect()
854    }
855
856    pub fn pk_serde(&self) -> &OrderedRowSerde {
857        &self.pk_serde
858    }
859
860    pub fn vnodes(&self) -> &Arc<Bitmap> {
861        self.distribution.vnodes()
862    }
863
864    pub fn value_indices(&self) -> &Option<Vec<usize>> {
865        &self.value_indices
866    }
867
868    pub fn is_consistent_op(&self) -> bool {
869        matches!(
870            self.op_consistency_level,
871            StateTableOpConsistencyLevel::ConsistentOldValue
872                | StateTableOpConsistencyLevel::LogStoreEnabled
873        )
874    }
875}
876
877impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
878where
879    S: StateStore,
880    SD: ValueRowSerde,
881{
882    /// Create replicated state table from table catalog with output indices
883    pub async fn new_replicated(
884        table_catalog: &Table,
885        store: S,
886        vnodes: Option<Arc<Bitmap>>,
887        output_column_ids: Vec<ColumnId>,
888    ) -> Self {
889        // TODO: can it be ConsistentOldValue?
890        // TODO: may enable preload_all_rows
891        StateTableBuilder::new(table_catalog, store, vnodes)
892            .with_op_consistency_level(StateTableOpConsistencyLevel::Inconsistent)
893            .with_output_column_ids(output_column_ids)
894            .forbid_preload_all_rows()
895            .build()
896            .await
897    }
898}
899
900// point get
901impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
902    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
903where
904    S: StateStore,
905    SD: ValueRowSerde,
906{
907    /// Get a single row from state table.
908    pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
909        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
910        let row = self.row_store.get(serialized_pk, prefix_hint).await?;
911        match row {
912            Some(row) => {
913                if IS_REPLICATED {
914                    // If the table is replicated, we need to deserialize the row with the output
915                    // indices.
916                    let row = row.project(&self.output_indices);
917                    Ok(Some(row.into_owned_row()))
918                } else {
919                    Ok(Some(row))
920                }
921            }
922            None => Ok(None),
923        }
924    }
925
926    /// Get a raw encoded row from state table.
927    pub async fn exists(&self, pk: impl Row) -> StreamExecutorResult<bool> {
928        let (serialized_pk, prefix_hint) = self.serialize_pk_and_get_prefix_hint(&pk);
929        self.row_store.exists(serialized_pk, prefix_hint).await
930    }
931
932    fn serialize_pk(&self, pk: &impl Row) -> TableKey<Bytes> {
933        assert!(pk.len() <= self.pk_indices.len());
934        serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk))
935    }
936
937    fn serialize_pk_and_get_prefix_hint(&self, pk: &impl Row) -> (TableKey<Bytes>, Option<Bytes>) {
938        let serialized_pk = self.serialize_pk(&pk);
939        let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
940            Some(serialized_pk.slice(VirtualNode::SIZE..))
941        } else {
942            #[cfg(debug_assertions)]
943            if self.prefix_hint_len != 0 {
944                warn!(
945                    "prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter"
946                );
947            }
948            None
949        };
950        (serialized_pk, prefix_hint)
951    }
952}
953
954impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
955    async fn get(
956        &self,
957        key_bytes: TableKey<Bytes>,
958        prefix_hint: Option<Bytes>,
959    ) -> StreamExecutorResult<Option<OwnedRow>> {
960        if let Some(rows) = &self.all_rows {
961            let (vnode, key) = key_bytes.split_vnode();
962            return Ok(rows.get(&vnode).expect("covered vnode").get(key).cloned());
963        }
964        let read_options = ReadOptions {
965            prefix_hint,
966            retention_seconds: self.table_option.retention_seconds,
967            cache_policy: CachePolicy::Fill(Hint::Normal),
968            ..Default::default()
969        };
970
971        // TODO: avoid clone when `on_key_value_fn` can be non-static
972        let row_serde = self.row_serde.clone();
973
974        self.state_store
975            .on_key_value(key_bytes, read_options, move |_, value| {
976                let row = row_serde.deserialize(value)?;
977                Ok(OwnedRow::new(row))
978            })
979            .await
980            .map_err(Into::into)
981    }
982
983    async fn exists(
984        &self,
985        key_bytes: TableKey<Bytes>,
986        prefix_hint: Option<Bytes>,
987    ) -> StreamExecutorResult<bool> {
988        if let Some(rows) = &self.all_rows {
989            let (vnode, key) = key_bytes.split_vnode();
990            return Ok(rows.get(&vnode).expect("covered vnode").contains_key(key));
991        }
992        let read_options = ReadOptions {
993            prefix_hint,
994            retention_seconds: self.table_option.retention_seconds,
995            cache_policy: CachePolicy::Fill(Hint::Normal),
996            ..Default::default()
997        };
998        let result = self
999            .state_store
1000            .on_key_value(key_bytes, read_options, move |_, _| Ok(()))
1001            .await?;
1002        Ok(result.is_some())
1003    }
1004}
1005
1006/// A callback struct returned from [`StateTableInner::commit`].
1007///
1008/// Introduced to support single barrier configuration change proposed in <https://github.com/risingwavelabs/risingwave/issues/18312>.
1009/// In brief, to correctly handle the configuration change, when each stateful executor receives an upstream barrier, it should handle
1010/// the barrier in the order of `state_table.commit()` -> `yield barrier` -> `update_vnode_bitmap`.
1011///
1012/// The `StateTablePostCommit` captures the mutable reference of `state_table` when calling `state_table.commit()`, and after the executor
1013/// runs `yield barrier`, it should call `StateTablePostCommit::post_yield_barrier` to apply the vnode bitmap update if there is any.
1014/// The `StateTablePostCommit` is marked with `must_use`. The method name `post_yield_barrier` indicates that it should be called after
1015/// we have yielded the barrier. In `StateTable`, we add a flag `on_post_commit`, to indicate that whether the `StateTablePostCommit` is handled
1016/// properly. On `state_table.commit()`, we will mark the `on_post_commit` as true, and in `StateTablePostCommit::post_yield_barrier`, we will
1017/// remark the flag as false, and on `state_table.commit()`, we will assert that the `on_post_commit` must be false. Note that, the `post_yield_barrier`
1018/// should be called for all barriers rather than only for the barrier with update vnode bitmap. In this way, though we don't have scale test for all
1019/// streaming executor, we can ensure that all executor covered by normal e2e test have properly handled the `StateTablePostCommit`.
1020#[must_use]
1021pub struct StateTablePostCommit<
1022    'a,
1023    S,
1024    SD = BasicSerde,
1025    const IS_REPLICATED: bool = false,
1026    const USE_WATERMARK_CACHE: bool = false,
1027> where
1028    S: StateStore,
1029    SD: ValueRowSerde,
1030{
1031    inner: &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1032}
1033
1034impl<'a, S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1035    StateTablePostCommit<'a, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1036where
1037    S: StateStore,
1038    SD: ValueRowSerde,
1039{
1040    pub async fn post_yield_barrier(
1041        mut self,
1042        new_vnodes: Option<Arc<Bitmap>>,
1043    ) -> StreamExecutorResult<
1044        Option<(
1045            (
1046                Arc<Bitmap>,
1047                Arc<Bitmap>,
1048                &'a mut StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>,
1049            ),
1050            bool,
1051        )>,
1052    > {
1053        self.inner.on_post_commit = false;
1054        Ok(if let Some(new_vnodes) = new_vnodes {
1055            let (old_vnodes, cache_may_stale) =
1056                self.update_vnode_bitmap(new_vnodes.clone()).await?;
1057            Some(((new_vnodes, old_vnodes, self.inner), cache_may_stale))
1058        } else {
1059            None
1060        })
1061    }
1062
1063    pub fn inner(&self) -> &StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE> {
1064        &*self.inner
1065    }
1066
1067    /// Update the vnode bitmap of the state table, returns the previous vnode bitmap.
1068    async fn update_vnode_bitmap(
1069        &mut self,
1070        new_vnodes: Arc<Bitmap>,
1071    ) -> StreamExecutorResult<(Arc<Bitmap>, bool)> {
1072        let prev_vnodes = self
1073            .inner
1074            .row_store
1075            .update_vnode_bitmap(new_vnodes.clone())
1076            .await?;
1077        assert_eq!(
1078            &prev_vnodes,
1079            self.inner.vnodes(),
1080            "state table and state store vnode bitmap mismatches"
1081        );
1082
1083        if self.inner.distribution.is_singleton() {
1084            assert_eq!(
1085                &new_vnodes,
1086                self.inner.vnodes(),
1087                "should not update vnode bitmap for singleton table"
1088            );
1089        }
1090        assert_eq!(self.inner.vnodes().len(), new_vnodes.len());
1091
1092        let cache_may_stale = cache_may_stale(self.inner.vnodes(), &new_vnodes);
1093
1094        if cache_may_stale {
1095            self.inner.pending_watermark = None;
1096            if USE_WATERMARK_CACHE {
1097                self.inner.watermark_cache.clear();
1098            }
1099        }
1100
1101        Ok((
1102            self.inner.distribution.update_vnode_bitmap(new_vnodes),
1103            cache_may_stale,
1104        ))
1105    }
1106}
1107
1108// write
1109impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1110    fn handle_mem_table_error(&self, e: StorageError) {
1111        let e = match e.into_inner() {
1112            ErrorKind::MemTable(e) => e,
1113            _ => unreachable!("should only get memtable error"),
1114        };
1115        match *e {
1116            MemTableError::InconsistentOperation { key, prev, new, .. } => {
1117                let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
1118                panic!(
1119                    "mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
1120                    self.table_id,
1121                    vnode,
1122                    &key,
1123                    prev.debug_fmt(&*self.row_serde),
1124                    new.debug_fmt(&*self.row_serde),
1125                )
1126            }
1127        }
1128    }
1129
1130    fn insert(&mut self, key: TableKey<Bytes>, value: impl Row) {
1131        insane_mode_discard_point!();
1132        let value_bytes = self.row_serde.serialize(&value).into();
1133        if let Some(rows) = &mut self.all_rows {
1134            let (vnode, key) = key.split_vnode_bytes();
1135            rows.get_mut(&vnode)
1136                .expect("covered vnode")
1137                .insert(key, value.into_owned_row());
1138        }
1139        self.state_store
1140            .insert(key, value_bytes, None)
1141            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1142    }
1143
1144    fn delete(&mut self, key: TableKey<Bytes>, value: impl Row) {
1145        insane_mode_discard_point!();
1146        let value_bytes = self.row_serde.serialize(value).into();
1147        if let Some(rows) = &mut self.all_rows {
1148            let (vnode, key) = key.split_vnode();
1149            rows.get_mut(&vnode).expect("covered vnode").remove(key);
1150        }
1151        self.state_store
1152            .delete(key, value_bytes)
1153            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1154    }
1155
1156    fn update(&mut self, key_bytes: TableKey<Bytes>, old_value: impl Row, new_value: impl Row) {
1157        insane_mode_discard_point!();
1158        let new_value_bytes = self.row_serde.serialize(&new_value).into();
1159        let old_value_bytes = self.row_serde.serialize(old_value).into();
1160        if let Some(rows) = &mut self.all_rows {
1161            let (vnode, key) = key_bytes.split_vnode_bytes();
1162            rows.get_mut(&vnode)
1163                .expect("covered vnode")
1164                .insert(key, new_value.into_owned_row());
1165        }
1166        self.state_store
1167            .insert(key_bytes, new_value_bytes, Some(old_value_bytes))
1168            .unwrap_or_else(|e| self.handle_mem_table_error(e));
1169    }
1170}
1171
1172impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1173    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1174where
1175    S: StateStore,
1176    SD: ValueRowSerde,
1177{
1178    /// Insert a row into state table. Must provide a full row corresponding to the column desc of
1179    /// the table.
1180    pub fn insert(&mut self, value: impl Row) {
1181        let pk_indices = &self.pk_indices;
1182        let pk = (&value).project(pk_indices);
1183        if USE_WATERMARK_CACHE {
1184            self.watermark_cache.insert(&pk);
1185        }
1186
1187        let key_bytes = self.serialize_pk(&pk);
1188        dispatch_value_indices!(&self.value_indices, [value], {
1189            self.row_store.insert(key_bytes, value)
1190        })
1191    }
1192
1193    /// Delete a row from state table. Must provide a full row of old value corresponding to the
1194    /// column desc of the table.
1195    pub fn delete(&mut self, old_value: impl Row) {
1196        let pk_indices = &self.pk_indices;
1197        let pk = (&old_value).project(pk_indices);
1198        if USE_WATERMARK_CACHE {
1199            self.watermark_cache.delete(&pk);
1200        }
1201
1202        let key_bytes = self.serialize_pk(&pk);
1203        dispatch_value_indices!(&self.value_indices, [old_value], {
1204            self.row_store.delete(key_bytes, old_value)
1205        })
1206    }
1207
1208    /// Update a row. The old and new value should have the same pk.
1209    pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
1210        let old_pk = (&old_value).project(self.pk_indices());
1211        let new_pk = (&new_value).project(self.pk_indices());
1212        debug_assert!(
1213            Row::eq(&old_pk, new_pk),
1214            "pk should not change: {old_pk:?} vs {new_pk:?}. {}",
1215            self.table_id
1216        );
1217
1218        let key_bytes = self.serialize_pk(&new_pk);
1219        dispatch_value_indices!(&self.value_indices, [old_value, new_value], {
1220            self.row_store.update(key_bytes, old_value, new_value)
1221        })
1222    }
1223
1224    /// Write a record into state table. Must have the same schema with the table.
1225    pub fn write_record(&mut self, record: Record<impl Row>) {
1226        match record {
1227            Record::Insert { new_row } => self.insert(new_row),
1228            Record::Delete { old_row } => self.delete(old_row),
1229            Record::Update { old_row, new_row } => self.update(old_row, new_row),
1230        }
1231    }
1232
1233    fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
1234        fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
1235    }
1236
1237    /// Write batch with a `StreamChunk` which should have the same schema with the table.
1238    // allow(izip, which use zip instead of zip_eq)
1239    #[allow(clippy::disallowed_methods)]
1240    pub fn write_chunk(&mut self, chunk: StreamChunk) {
1241        let chunk = if IS_REPLICATED {
1242            self.fill_non_output_indices(chunk)
1243        } else {
1244            chunk
1245        };
1246
1247        let vnodes = self
1248            .distribution
1249            .compute_chunk_vnode(&chunk, &self.pk_indices);
1250
1251        for (idx, optional_row) in chunk.rows_with_holes().enumerate() {
1252            let Some((op, row)) = optional_row else {
1253                continue;
1254            };
1255            let pk = row.project(&self.pk_indices);
1256            let vnode = vnodes[idx];
1257            let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, vnode);
1258            match op {
1259                Op::Insert | Op::UpdateInsert => {
1260                    if USE_WATERMARK_CACHE {
1261                        self.watermark_cache.insert(&pk);
1262                    }
1263                    dispatch_value_indices!(&self.value_indices, [row], {
1264                        self.row_store.insert(key_bytes, row);
1265                    });
1266                }
1267                Op::Delete | Op::UpdateDelete => {
1268                    if USE_WATERMARK_CACHE {
1269                        self.watermark_cache.delete(&pk);
1270                    }
1271                    dispatch_value_indices!(&self.value_indices, [row], {
1272                        self.row_store.delete(key_bytes, row);
1273                    });
1274                }
1275            }
1276        }
1277    }
1278
1279    /// Update watermark for state cleaning.
1280    ///
1281    /// # Arguments
1282    ///
1283    /// * `watermark` - Latest watermark received.
1284    pub fn update_watermark(&mut self, watermark: ScalarImpl) {
1285        trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
1286        self.pending_watermark = Some(watermark);
1287    }
1288
1289    /// Get the committed watermark of the state table. Watermarks should be fed into the state
1290    /// table through `update_watermark` method.
1291    pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
1292        self.committed_watermark.as_ref()
1293    }
1294
1295    pub async fn commit(
1296        &mut self,
1297        new_epoch: EpochPair,
1298    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1299    {
1300        self.commit_inner(new_epoch, None).await
1301    }
1302
1303    #[cfg(test)]
1304    pub async fn commit_for_test(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
1305        self.commit_assert_no_update_vnode_bitmap(new_epoch).await
1306    }
1307
1308    pub async fn commit_assert_no_update_vnode_bitmap(
1309        &mut self,
1310        new_epoch: EpochPair,
1311    ) -> StreamExecutorResult<()> {
1312        let post_commit = self.commit_inner(new_epoch, None).await?;
1313        post_commit.post_yield_barrier(None).await?;
1314        Ok(())
1315    }
1316
1317    pub async fn commit_may_switch_consistent_op(
1318        &mut self,
1319        new_epoch: EpochPair,
1320        op_consistency_level: StateTableOpConsistencyLevel,
1321    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1322    {
1323        if self.op_consistency_level != op_consistency_level {
1324            // avoid flooding e2e-test log
1325            if !cfg!(debug_assertions) {
1326                info!(
1327                    ?new_epoch,
1328                    prev_op_consistency_level = ?self.op_consistency_level,
1329                    ?op_consistency_level,
1330                    table_id = self.table_id.table_id,
1331                    "switch to new op consistency level"
1332                );
1333            }
1334            self.commit_inner(new_epoch, Some(op_consistency_level))
1335                .await
1336        } else {
1337            self.commit_inner(new_epoch, None).await
1338        }
1339    }
1340
1341    async fn commit_inner(
1342        &mut self,
1343        new_epoch: EpochPair,
1344        switch_consistent_op: Option<StateTableOpConsistencyLevel>,
1345    ) -> StreamExecutorResult<StateTablePostCommit<'_, S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>>
1346    {
1347        assert!(!self.on_post_commit);
1348        assert_eq!(
1349            self.epoch.expect("should only be called after init").curr,
1350            new_epoch.prev
1351        );
1352        if let Some(new_consistency_level) = switch_consistent_op {
1353            assert_ne!(self.op_consistency_level, new_consistency_level);
1354            self.op_consistency_level = new_consistency_level;
1355        }
1356        trace!(
1357            table_id = %self.table_id,
1358            epoch = ?self.epoch,
1359            "commit state table"
1360        );
1361
1362        let table_watermarks = self.commit_pending_watermark();
1363        self.row_store
1364            .seal_current_epoch(new_epoch.curr, table_watermarks, switch_consistent_op)
1365            .await?;
1366        self.epoch = Some(new_epoch);
1367
1368        // Refresh watermark cache if it is out of sync.
1369        if USE_WATERMARK_CACHE
1370            && !self.watermark_cache.is_synced()
1371            && let Some(ref watermark) = self.committed_watermark
1372        {
1373            let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
1374                (Included(once(Some(watermark.clone()))), Unbounded);
1375            // NOTE(kwannoel): We buffer `pks` before inserting into watermark cache
1376            // because we can't hold an immutable ref (via `iter_key_and_val_with_pk_range`)
1377            // and a mutable ref (via `self.watermark_cache.insert`) at the same time.
1378            // TODO(kwannoel): We can optimize it with:
1379            // 1. Either use `RefCell`.
1380            // 2. Or pass in a direct reference to LocalStateStore,
1381            //    instead of referencing it indirectly from `self`.
1382            //    Similar to how we do for pk_indices.
1383            let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
1384            {
1385                let mut streams = vec![];
1386                for vnode in self.vnodes().iter_vnodes() {
1387                    let stream = self
1388                        .iter_keyed_row_with_vnode(vnode, &range, PrefetchOptions::default())
1389                        .await?;
1390                    streams.push(Box::pin(stream));
1391                }
1392                let merged_stream = merge_sort(streams);
1393                pin_mut!(merged_stream);
1394
1395                #[for_await]
1396                for entry in merged_stream.take(self.watermark_cache.capacity()) {
1397                    let keyed_row = entry?;
1398                    let pk = self.pk_serde.deserialize(keyed_row.key())?;
1399                    // watermark column should be part of the pk
1400                    if !pk.is_null_at(self.clean_watermark_index_in_pk.unwrap_or(0) as usize) {
1401                        pks.push(pk);
1402                    }
1403                }
1404            }
1405
1406            let mut filler = self.watermark_cache.begin_syncing();
1407            for pk in pks {
1408                filler.insert_unchecked(DefaultOrdered(pk), ());
1409            }
1410            filler.finish();
1411
1412            let n_cache_entries = self.watermark_cache.len();
1413            if n_cache_entries < self.watermark_cache.capacity() {
1414                self.watermark_cache.set_table_row_count(n_cache_entries);
1415            }
1416        }
1417
1418        self.on_post_commit = true;
1419        Ok(StateTablePostCommit { inner: self })
1420    }
1421
1422    /// Commit pending watermark and return vnode bitmap-watermark pairs to seal.
1423    fn commit_pending_watermark(
1424        &mut self,
1425    ) -> Option<(WatermarkDirection, Vec<VnodeWatermark>, WatermarkSerdeType)> {
1426        let watermark = self.pending_watermark.take()?;
1427        trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
1428
1429        assert!(
1430            !self.pk_indices().is_empty(),
1431            "see pending watermark on empty pk"
1432        );
1433        let watermark_serializer = {
1434            match self.clean_watermark_index_in_pk {
1435                None => self.pk_serde.index(0),
1436                Some(clean_watermark_index_in_pk) => {
1437                    self.pk_serde.index(clean_watermark_index_in_pk as usize)
1438                }
1439            }
1440        };
1441
1442        let watermark_type = match self.clean_watermark_index_in_pk {
1443            None => WatermarkSerdeType::PkPrefix,
1444            Some(clean_watermark_index_in_pk) => match clean_watermark_index_in_pk {
1445                0 => WatermarkSerdeType::PkPrefix,
1446                _ => WatermarkSerdeType::NonPkPrefix,
1447            },
1448        };
1449
1450        let should_clean_watermark = {
1451            {
1452                if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
1453                    if let Some(key) = self.watermark_cache.lowest_key() {
1454                        watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
1455                    } else {
1456                        // Watermark cache is synced,
1457                        // And there's no key in watermark cache.
1458                        // That implies table is empty.
1459                        // We should not clean watermark.
1460                        false
1461                    }
1462                } else {
1463                    // Either we are not using watermark cache,
1464                    // Or watermark_cache is not synced.
1465                    // In either case we should clean watermark.
1466                    true
1467                }
1468            }
1469        };
1470
1471        let watermark_suffix =
1472            serialize_pk(row::once(Some(watermark.clone())), &watermark_serializer);
1473
1474        // Compute Delete Ranges
1475        let seal_watermark = if should_clean_watermark {
1476            trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
1477                self.vnodes().iter_vnodes().collect_vec()
1478            }, "delete range");
1479
1480            let order_type = watermark_serializer.get_order_types().get(0).unwrap();
1481
1482            if order_type.is_ascending() {
1483                Some((
1484                    WatermarkDirection::Ascending,
1485                    VnodeWatermark::new(
1486                        self.vnodes().clone(),
1487                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1488                    ),
1489                    watermark_type,
1490                ))
1491            } else {
1492                Some((
1493                    WatermarkDirection::Descending,
1494                    VnodeWatermark::new(
1495                        self.vnodes().clone(),
1496                        Bytes::copy_from_slice(watermark_suffix.as_ref()),
1497                    ),
1498                    watermark_type,
1499                ))
1500            }
1501        } else {
1502            None
1503        };
1504        self.committed_watermark = Some(watermark);
1505
1506        // Clear the watermark cache and force a resync.
1507        // TODO(kwannoel): This can be further optimized:
1508        // 1. Add a `cache.drain_until` interface, so we only clear the watermark cache
1509        //    up to the largest end of delete ranges.
1510        // 2. Mark the cache as not_synced, so we can still refill it later.
1511        // 3. When refilling the cache,
1512        //    we just refill from the largest value of the cache, as the lower bound.
1513        if USE_WATERMARK_CACHE && seal_watermark.is_some() {
1514            self.watermark_cache.clear();
1515        }
1516
1517        seal_watermark.map(|(direction, watermark, is_non_pk_prefix)| {
1518            (direction, vec![watermark], is_non_pk_prefix)
1519        })
1520    }
1521
1522    pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
1523        self.row_store.try_flush().await?;
1524        Ok(())
1525    }
1526}
1527
1528pub trait RowStream<'a> = Stream<Item = StreamExecutorResult<OwnedRow>> + 'a;
1529pub trait KeyedRowStream<'a> = Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a;
1530pub trait PkRowStream<'a, K> = Stream<Item = StreamExecutorResult<(K, OwnedRow)>> + 'a;
1531
1532pub trait FromVnodeBytes {
1533    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self;
1534}
1535
1536impl FromVnodeBytes for Bytes {
1537    fn from_vnode_bytes(vnode: VirtualNode, bytes: &Bytes) -> Self {
1538        prefix_slice_with_vnode(vnode, bytes)
1539    }
1540}
1541
1542impl FromVnodeBytes for () {
1543    fn from_vnode_bytes(_vnode: VirtualNode, _bytes: &Bytes) -> Self {}
1544}
1545
1546// Iterator functions
1547impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1548    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1549where
1550    S: StateStore,
1551    SD: ValueRowSerde,
1552{
1553    /// This function scans rows from the relational table with specific `pk_range` under the same
1554    /// `vnode`.
1555    pub async fn iter_with_vnode(
1556        &self,
1557
1558        // Optional vnode that returns an iterator only over the given range under that vnode.
1559        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1560        // iterate over each vnode that the `StateTableInner` owns.
1561        vnode: VirtualNode,
1562        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1563        prefetch_options: PrefetchOptions,
1564    ) -> StreamExecutorResult<impl RowStream<'_>> {
1565        Ok(self
1566            .iter_kv_with_pk_range::<()>(pk_range, vnode, prefetch_options)
1567            .await?
1568            .map_ok(|(_, row)| row))
1569    }
1570
1571    pub async fn iter_keyed_row_with_vnode(
1572        &self,
1573        vnode: VirtualNode,
1574        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1575        prefetch_options: PrefetchOptions,
1576    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1577        Ok(self
1578            .iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
1579            .await?
1580            .map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)))
1581    }
1582
1583    pub async fn iter_with_vnode_and_output_indices(
1584        &self,
1585        vnode: VirtualNode,
1586        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1587        prefetch_options: PrefetchOptions,
1588    ) -> StreamExecutorResult<impl RowStream<'_>> {
1589        assert!(IS_REPLICATED);
1590        let stream = self
1591            .iter_with_vnode(vnode, pk_range, prefetch_options)
1592            .await?;
1593        Ok(stream.map(|row| row.map(|row| row.project(&self.output_indices).into_owned_row())))
1594    }
1595}
1596
1597impl<LS: LocalStateStore, SD: ValueRowSerde> StateTableRowStore<LS, SD> {
1598    async fn iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1599        &self,
1600        vnode: VirtualNode,
1601        (start, end): (Bound<Bytes>, Bound<Bytes>),
1602        prefix_hint: Option<Bytes>,
1603        prefetch_options: PrefetchOptions,
1604    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1605        if let Some(rows) = &self.all_rows {
1606            return Ok(futures::future::Either::Left(futures::stream::iter(
1607                rows.get(&vnode)
1608                    .expect("covered vnode")
1609                    .range((start, end))
1610                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1611            )));
1612        }
1613        let read_options = ReadOptions {
1614            prefix_hint,
1615            retention_seconds: self.table_option.retention_seconds,
1616            prefetch_options,
1617            cache_policy: CachePolicy::Fill(Hint::Normal),
1618        };
1619
1620        Ok(futures::future::Either::Right(
1621            deserialize_keyed_row_stream(
1622                self.state_store
1623                    .iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1624                    .await?,
1625                &*self.row_serde,
1626            ),
1627        ))
1628    }
1629
1630    async fn rev_iter_kv<K: CopyFromSlice + FromVnodeBytes>(
1631        &self,
1632        vnode: VirtualNode,
1633        (start, end): (Bound<Bytes>, Bound<Bytes>),
1634        prefix_hint: Option<Bytes>,
1635        prefetch_options: PrefetchOptions,
1636    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1637        if let Some(rows) = &self.all_rows {
1638            return Ok(futures::future::Either::Left(futures::stream::iter(
1639                rows.get(&vnode)
1640                    .expect("covered vnode")
1641                    .range((start, end))
1642                    .rev()
1643                    .map(move |(key, value)| Ok((K::from_vnode_bytes(vnode, key), value.clone()))),
1644            )));
1645        }
1646        let read_options = ReadOptions {
1647            prefix_hint,
1648            retention_seconds: self.table_option.retention_seconds,
1649            prefetch_options,
1650            cache_policy: CachePolicy::Fill(Hint::Normal),
1651        };
1652
1653        Ok(futures::future::Either::Right(
1654            deserialize_keyed_row_stream(
1655                self.state_store
1656                    .rev_iter(prefixed_range_with_vnode((start, end), vnode), read_options)
1657                    .await?,
1658                &*self.row_serde,
1659            ),
1660        ))
1661    }
1662}
1663
1664impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
1665    StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
1666where
1667    S: StateStore,
1668    SD: ValueRowSerde,
1669{
1670    /// This function scans rows from the relational table with specific `prefix` and `sub_range` under the same
1671    /// `vnode`. If `sub_range` is (Unbounded, Unbounded), it scans rows from the relational table with specific `pk_prefix`.
1672    /// `pk_prefix` is used to identify the exact vnode the scan should perform on.
1673    pub async fn iter_with_prefix(
1674        &self,
1675        pk_prefix: impl Row,
1676        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1677        prefetch_options: PrefetchOptions,
1678    ) -> StreamExecutorResult<impl RowStream<'_>> {
1679        let stream = self.iter_with_prefix_inner::</* REVERSE */ false, ()>(pk_prefix, sub_range, prefetch_options)
1680            .await?;
1681        Ok(stream.map_ok(|(_, row)| row))
1682    }
1683
1684    /// Get the row from a state table with only 1 row.
1685    pub async fn get_from_one_row_table(&self) -> StreamExecutorResult<Option<OwnedRow>> {
1686        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Unbounded, Unbounded);
1687        let stream = self
1688            .iter_with_prefix(row::empty(), sub_range, Default::default())
1689            .await?;
1690        pin_mut!(stream);
1691
1692        if let Some(res) = stream.next().await {
1693            let value = res?.into_owned_row();
1694            assert!(stream.next().await.is_none());
1695            Ok(Some(value))
1696        } else {
1697            Ok(None)
1698        }
1699    }
1700
1701    /// Get the row from a state table with only 1 row, and the row has only 1 col.
1702    ///
1703    /// `None` can mean either the row is never persisted, or is a persisted `NULL`,
1704    /// which does not matter in the use case.
1705    pub async fn get_from_one_value_table(&self) -> StreamExecutorResult<Option<ScalarImpl>> {
1706        Ok(self
1707            .get_from_one_row_table()
1708            .await?
1709            .and_then(|row| row[0].clone()))
1710    }
1711
1712    pub async fn iter_keyed_row_with_prefix(
1713        &self,
1714        pk_prefix: impl Row,
1715        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1716        prefetch_options: PrefetchOptions,
1717    ) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
1718        Ok(
1719            self.iter_with_prefix_inner::</* REVERSE */ false, Bytes>(pk_prefix, sub_range, prefetch_options)
1720                .await?.map_ok(|(key, row)| KeyedRow::new(TableKey(key), row)),
1721        )
1722    }
1723
1724    /// This function scans the table just like `iter_with_prefix`, but in reverse order.
1725    pub async fn rev_iter_with_prefix(
1726        &self,
1727        pk_prefix: impl Row,
1728        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1729        prefetch_options: PrefetchOptions,
1730    ) -> StreamExecutorResult<impl RowStream<'_>> {
1731        Ok(
1732            self.iter_with_prefix_inner::</* REVERSE */ true, ()>(pk_prefix, sub_range, prefetch_options)
1733                .await?.map_ok(|(_, row)| row),
1734        )
1735    }
1736
1737    async fn iter_with_prefix_inner<const REVERSE: bool, K: CopyFromSlice + FromVnodeBytes>(
1738        &self,
1739        pk_prefix: impl Row,
1740        sub_range: &(Bound<impl Row>, Bound<impl Row>),
1741        prefetch_options: PrefetchOptions,
1742    ) -> StreamExecutorResult<impl PkRowStream<'_, K>> {
1743        let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
1744        let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1745
1746        // We assume that all usages of iterating the state table only access a single vnode.
1747        // If this assertion fails, then something must be wrong with the operator implementation or
1748        // the distribution derivation from the optimizer.
1749        let vnode = self.compute_prefix_vnode(&pk_prefix);
1750
1751        // Construct prefix hint for prefix bloom filter.
1752        let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
1753        if self.prefix_hint_len != 0 {
1754            debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
1755        }
1756        let prefix_hint = {
1757            if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
1758                None
1759            } else {
1760                let encoded_prefix_len = self
1761                    .pk_serde
1762                    .deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
1763
1764                Some(Bytes::copy_from_slice(
1765                    &encoded_prefix[..encoded_prefix_len],
1766                ))
1767            }
1768        };
1769
1770        trace!(
1771            table_id = %self.table_id(),
1772            ?prefix_hint, ?pk_prefix,
1773            ?pk_prefix_indices,
1774            iter_direction = if REVERSE { "reverse" } else { "forward" },
1775            "storage_iter_with_prefix"
1776        );
1777
1778        let memcomparable_range =
1779            prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
1780
1781        Ok(if REVERSE {
1782            futures::future::Either::Left(
1783                self.row_store
1784                    .rev_iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1785                    .await?,
1786            )
1787        } else {
1788            futures::future::Either::Right(
1789                self.row_store
1790                    .iter_kv(vnode, memcomparable_range, prefix_hint, prefetch_options)
1791                    .await?,
1792            )
1793        })
1794    }
1795
1796    /// This function scans raw key-values from the relational table with specific `pk_range` under
1797    /// the same `vnode`.
1798    async fn iter_kv_with_pk_range<'a, K: CopyFromSlice + FromVnodeBytes>(
1799        &'a self,
1800        pk_range: &(Bound<impl Row>, Bound<impl Row>),
1801        // Optional vnode that returns an iterator only over the given range under that vnode.
1802        // For now, we require this parameter, and will panic. In the future, when `None`, we can
1803        // iterate over each vnode that the `StateTableInner` owns.
1804        vnode: VirtualNode,
1805        prefetch_options: PrefetchOptions,
1806    ) -> StreamExecutorResult<impl PkRowStream<'a, K>> {
1807        let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
1808
1809        // TODO: provide a trace of useful params.
1810        self.row_store
1811            .iter_kv(vnode, memcomparable_range, None, prefetch_options)
1812            .await
1813    }
1814
1815    #[cfg(test)]
1816    pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
1817        &self.watermark_cache
1818    }
1819}
1820
1821fn deserialize_keyed_row_stream<'a, K: CopyFromSlice>(
1822    iter: impl StateStoreIter + 'a,
1823    deserializer: &'a impl ValueRowSerde,
1824) -> impl PkRowStream<'a, K> {
1825    iter.into_stream(move |(key, value)| {
1826        Ok((
1827            K::copy_from_slice(key.user_key.table_key.as_ref()),
1828            deserializer.deserialize(value).map(OwnedRow::new)?,
1829        ))
1830    })
1831    .map_err(Into::into)
1832}
1833
1834pub fn prefix_range_to_memcomparable(
1835    pk_serde: &OrderedRowSerde,
1836    range: &(Bound<impl Row>, Bound<impl Row>),
1837) -> (Bound<Bytes>, Bound<Bytes>) {
1838    (
1839        start_range_to_memcomparable(pk_serde, &range.0),
1840        end_range_to_memcomparable(pk_serde, &range.1, None),
1841    )
1842}
1843
1844fn prefix_and_sub_range_to_memcomparable(
1845    pk_serde: &OrderedRowSerde,
1846    sub_range: &(Bound<impl Row>, Bound<impl Row>),
1847    pk_prefix: impl Row,
1848) -> (Bound<Bytes>, Bound<Bytes>) {
1849    let (range_start, range_end) = sub_range;
1850    let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1851    let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
1852    let start_range = match range_start {
1853        Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
1854        Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
1855        Unbounded => Bound::Included(Either::Right(&pk_prefix)),
1856    };
1857    let end_range = match range_end {
1858        Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
1859        Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
1860        Unbounded => Unbounded,
1861    };
1862    (
1863        start_range_to_memcomparable(pk_serde, &start_range),
1864        end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
1865    )
1866}
1867
1868fn start_range_to_memcomparable<R: Row>(
1869    pk_serde: &OrderedRowSerde,
1870    bound: &Bound<R>,
1871) -> Bound<Bytes> {
1872    let serialize_pk_prefix = |pk_prefix: &R| {
1873        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1874        serialize_pk(pk_prefix, &prefix_serializer)
1875    };
1876    match bound {
1877        Unbounded => Unbounded,
1878        Included(r) => {
1879            let serialized = serialize_pk_prefix(r);
1880
1881            Included(serialized)
1882        }
1883        Excluded(r) => {
1884            let serialized = serialize_pk_prefix(r);
1885
1886            start_bound_of_excluded_prefix(&serialized)
1887        }
1888    }
1889}
1890
1891fn end_range_to_memcomparable<R: Row>(
1892    pk_serde: &OrderedRowSerde,
1893    bound: &Bound<R>,
1894    serialized_pk_prefix: Option<Bytes>,
1895) -> Bound<Bytes> {
1896    let serialize_pk_prefix = |pk_prefix: &R| {
1897        let prefix_serializer = pk_serde.prefix(pk_prefix.len());
1898        serialize_pk(pk_prefix, &prefix_serializer)
1899    };
1900    match bound {
1901        Unbounded => match serialized_pk_prefix {
1902            Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
1903            None => Unbounded,
1904        },
1905        Included(r) => {
1906            let serialized = serialize_pk_prefix(r);
1907
1908            end_bound_of_prefix(&serialized)
1909        }
1910        Excluded(r) => {
1911            let serialized = serialize_pk_prefix(r);
1912            Excluded(serialized)
1913        }
1914    }
1915}
1916
1917fn fill_non_output_indices(
1918    i2o_mapping: &ColIndexMapping,
1919    data_types: &[DataType],
1920    chunk: StreamChunk,
1921) -> StreamChunk {
1922    let cardinality = chunk.cardinality();
1923    let (ops, columns, vis) = chunk.into_inner();
1924    let mut full_columns = Vec::with_capacity(data_types.len());
1925    for (i, data_type) in data_types.iter().enumerate() {
1926        if let Some(j) = i2o_mapping.try_map(i) {
1927            full_columns.push(columns[j].clone());
1928        } else {
1929            let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
1930            column_builder.append_n_null(cardinality);
1931            let column: ArrayRef = column_builder.finish().into();
1932            full_columns.push(column)
1933        }
1934    }
1935    let data_chunk = DataChunk::new(full_columns, vis);
1936    StreamChunk::from_parts(ops, data_chunk)
1937}
1938
1939#[cfg(test)]
1940mod tests {
1941    use std::fmt::Debug;
1942
1943    use expect_test::{Expect, expect};
1944
1945    use super::*;
1946
1947    fn check(actual: impl Debug, expect: Expect) {
1948        let actual = format!("{:#?}", actual);
1949        expect.assert_eq(&actual);
1950    }
1951
1952    #[test]
1953    fn test_fill_non_output_indices() {
1954        let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
1955        let replicated_chunk = [OwnedRow::new(vec![
1956            Some(222_i32.into()),
1957            Some(2_i32.into()),
1958        ])];
1959        let replicated_chunk = StreamChunk::from_parts(
1960            vec![Op::Insert],
1961            DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
1962        );
1963        let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
1964        let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
1965        check(
1966            filled_chunk,
1967            expect![[r#"
1968            StreamChunk { cardinality: 1, capacity: 1, data:
1969            +---+---+---+-----+
1970            | + | 2 |   | 222 |
1971            +---+---+---+-----+
1972             }"#]],
1973        );
1974    }
1975}