risingwave_storage/table/batch_table/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::Hint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41    CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45mod vector_index_reader;
46pub use vector_index_reader::VectorIndexReader;
47
48use crate::StateStore;
49use crate::error::{StorageError, StorageResult};
50use crate::hummock::CachePolicy;
51use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
52use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
53use crate::row_serde::{ColumnMapping, find_columns_by_ids};
54use crate::store::{
55    NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
56    StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
57};
58use crate::table::merge_sort::NodePeek;
59use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
60
61/// [`BatchTableInner`] is the interface accessing relational data in KV(`StateStore`) with
62/// row-based encoding format, and is used in batch mode.
63#[derive(Clone)]
64pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
65    /// Id for this table.
66    table_id: TableId,
67
68    /// State store backend.
69    store: S,
70
71    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
72    /// `RowSeqScanExecutor`.
73    schema: Schema,
74
75    /// Used for serializing and deserializing the primary key.
76    pk_serializer: OrderedRowSerde,
77
78    output_indices: Vec<usize>,
79
80    /// the key part of `output_indices`.
81    key_output_indices: Option<Vec<usize>>,
82
83    /// the value part of `output_indices`.
84    value_output_indices: Vec<usize>,
85
86    /// used for deserializing key part of output row from pk.
87    output_row_in_key_indices: Vec<usize>,
88
89    /// Mapping from column id to column index for deserializing the row.
90    mapping: Arc<ColumnMapping>,
91
92    /// The index of system column `_rw_timestamp` in the output columns.
93    epoch_idx: Option<usize>,
94
95    /// Row deserializer to deserialize the value in storage to a row.
96    /// The row can be either complete or partial, depending on whether the row encoding is versioned.
97    row_serde: Arc<SD>,
98
99    /// Indices of primary key.
100    /// Note that the index is based on the all columns of the table, instead of the output ones.
101    // FIXME: revisit constructions and usages.
102    pk_indices: Vec<usize>,
103
104    distribution: TableDistribution,
105
106    /// Used for catalog `table_properties`
107    table_option: TableOption,
108
109    read_prefix_len_hint: usize,
110}
111
112/// `BatchTable` will use [`EitherSerde`] as default so that we can support both versioned and
113/// non-versioned tables with the same type.
114pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
115
116impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
117    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118        f.debug_struct("BatchTableInner").finish_non_exhaustive()
119    }
120}
121
122// init
123impl<S: StateStore> BatchTableInner<S, EitherSerde> {
124    /// Create a  [`BatchTableInner`] given a complete set of `columns` and a partial
125    /// set of `output_column_ids`.
126    /// When reading from the storage table,
127    /// the chunks or rows will only contain columns with the given ids (`output_column_ids`).
128    /// They will in the same order as the given `output_column_ids`.
129    ///
130    /// NOTE(kwannoel): The `output_column_ids` here may be slightly different
131    /// from those supplied to associated executors.
132    /// These `output_column_ids` may have `pk` appended, since they will be needed to scan from
133    /// storage. The associated executors may not have these `pk` fields.
134    pub fn new_partial(
135        store: S,
136        output_column_ids: Vec<ColumnId>,
137        vnodes: Option<Arc<Bitmap>>,
138        table_desc: &StorageTableDesc,
139    ) -> Self {
140        let table_id = table_desc.table_id;
141        let column_descs = table_desc
142            .columns
143            .iter()
144            .map(ColumnDesc::from)
145            .collect_vec();
146        let order_types: Vec<OrderType> = table_desc
147            .pk
148            .iter()
149            .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150            .collect();
151
152        let pk_indices = table_desc
153            .pk
154            .iter()
155            .map(|k| k.column_index as usize)
156            .collect_vec();
157
158        let table_option = TableOption {
159            retention_seconds: table_desc.retention_seconds,
160        };
161        let value_indices = table_desc
162            .get_value_indices()
163            .iter()
164            .map(|&k| k as usize)
165            .collect_vec();
166        let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167        let versioned = table_desc.versioned;
168        let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170        Self::new_inner(
171            store,
172            table_id,
173            column_descs,
174            output_column_ids,
175            order_types,
176            pk_indices,
177            distribution,
178            table_option,
179            value_indices,
180            prefix_hint_len,
181            versioned,
182        )
183    }
184
185    pub fn for_test_with_partial_columns(
186        store: S,
187        table_id: TableId,
188        columns: Vec<ColumnDesc>,
189        output_column_ids: Vec<ColumnId>,
190        order_types: Vec<OrderType>,
191        pk_indices: Vec<usize>,
192        value_indices: Vec<usize>,
193    ) -> Self {
194        Self::new_inner(
195            store,
196            table_id,
197            columns,
198            output_column_ids,
199            order_types,
200            pk_indices,
201            TableDistribution::singleton(),
202            Default::default(),
203            value_indices,
204            0,
205            false,
206        )
207    }
208
209    pub fn for_test(
210        store: S,
211        table_id: TableId,
212        columns: Vec<ColumnDesc>,
213        order_types: Vec<OrderType>,
214        pk_indices: Vec<usize>,
215        value_indices: Vec<usize>,
216    ) -> Self {
217        let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218        Self::for_test_with_partial_columns(
219            store,
220            table_id,
221            columns,
222            output_column_ids,
223            order_types,
224            pk_indices,
225            value_indices,
226        )
227    }
228
229    #[allow(clippy::too_many_arguments)]
230    fn new_inner(
231        store: S,
232        table_id: TableId,
233        table_columns: Vec<ColumnDesc>,
234        output_column_ids: Vec<ColumnId>,
235        order_types: Vec<OrderType>,
236        pk_indices: Vec<usize>,
237        distribution: TableDistribution,
238        table_option: TableOption,
239        value_indices: Vec<usize>,
240        read_prefix_len_hint: usize,
241        versioned: bool,
242    ) -> Self {
243        assert_eq!(order_types.len(), pk_indices.len());
244
245        let (output_columns, output_indices) =
246            find_columns_by_ids(&table_columns, &output_column_ids);
247
248        let mut value_output_indices = vec![];
249        let mut key_output_indices = vec![];
250        // system column currently only contains `_rw_timestamp`
251        let mut epoch_idx = None;
252
253        for idx in &output_indices {
254            if value_indices.contains(idx) {
255                value_output_indices.push(*idx);
256            } else if pk_indices.contains(idx) {
257                key_output_indices.push(*idx);
258            } else {
259                assert!(epoch_idx.is_none());
260                epoch_idx = Some(*idx);
261            }
262        }
263
264        let output_row_in_key_indices = key_output_indices
265            .iter()
266            .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267            .collect_vec();
268        let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270        let pk_data_types = pk_indices
271            .iter()
272            .map(|i| table_columns[*i].data_type.clone())
273            .collect();
274        let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275        let (row_serde, mapping) = {
276            if versioned {
277                let value_output_indices_dedup = value_output_indices
278                    .iter()
279                    .unique()
280                    .copied()
281                    .collect::<Vec<_>>();
282                let output_row_in_value_output_indices_dedup = value_output_indices
283                    .iter()
284                    .map(|&di| {
285                        value_output_indices_dedup
286                            .iter()
287                            .position(|&pi| di == pi)
288                            .unwrap()
289                    })
290                    .collect_vec();
291                let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292                let serde =
293                    ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294                (serde.into(), mapping)
295            } else {
296                let output_row_in_value_indices = value_output_indices
297                    .iter()
298                    .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299                    .collect_vec();
300                let mapping = ColumnMapping::new(output_row_in_value_indices);
301                let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302                (serde.into(), mapping)
303            }
304        };
305
306        let key_output_indices = match key_output_indices.is_empty() {
307            true => None,
308            false => Some(key_output_indices),
309        };
310        Self {
311            table_id,
312            store,
313            schema,
314            pk_serializer,
315            output_indices,
316            key_output_indices,
317            value_output_indices,
318            output_row_in_key_indices,
319            mapping: Arc::new(mapping),
320            epoch_idx,
321            row_serde: Arc::new(row_serde),
322            pk_indices,
323            distribution,
324            table_option,
325            read_prefix_len_hint,
326        }
327    }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331    pub fn pk_serializer(&self) -> &OrderedRowSerde {
332        &self.pk_serializer
333    }
334
335    pub fn schema(&self) -> &Schema {
336        &self.schema
337    }
338
339    pub fn pk_indices(&self) -> &[usize] {
340        &self.pk_indices
341    }
342
343    pub fn output_indices(&self) -> &[usize] {
344        &self.output_indices
345    }
346
347    /// Get the indices of the primary key columns in the output columns.
348    ///
349    /// Returns `None` if any of the primary key columns is not in the output columns.
350    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351        self.pk_indices
352            .iter()
353            .map(|&i| self.output_indices.iter().position(|&j| i == j))
354            .collect()
355    }
356
357    pub fn table_id(&self) -> TableId {
358        self.table_id
359    }
360
361    pub fn vnodes(&self) -> &Arc<Bitmap> {
362        self.distribution.vnodes()
363    }
364}
365/// Point get
366impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367    /// Get a single row by point get
368    pub async fn get_row(
369        &self,
370        pk: impl Row,
371        wait_epoch: HummockReadEpoch,
372    ) -> StorageResult<Option<OwnedRow>> {
373        self.store
374            .try_wait_epoch(
375                wait_epoch,
376                TryWaitEpochOptions {
377                    table_id: self.table_id,
378                },
379            )
380            .await?;
381        let serialized_pk = serialize_pk_with_vnode(
382            &pk,
383            &self.pk_serializer,
384            self.distribution.compute_vnode_by_pk(&pk),
385        );
386        assert!(pk.len() <= self.pk_indices.len());
387
388        let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
389        {
390            Some(serialized_pk.slice(VirtualNode::SIZE..))
391        } else {
392            None
393        };
394
395        let read_options = ReadOptions {
396            prefix_hint,
397            retention_seconds: self.table_option.retention_seconds,
398            cache_policy: CachePolicy::Fill(Hint::Normal),
399            ..Default::default()
400        };
401        let read_snapshot = self
402            .store
403            .new_read_snapshot(
404                wait_epoch,
405                NewReadSnapshotOptions {
406                    table_id: self.table_id,
407                },
408            )
409            .await?;
410        match read_snapshot
411            .on_key_value(serialized_pk, read_options, move |key, value| {
412                let row = self.row_serde.deserialize(value)?;
413                Ok((key.epoch_with_gap.pure_epoch(), row))
414            })
415            .await?
416        {
417            Some((epoch, row)) => {
418                let result_row_in_value = self.mapping.project(OwnedRow::new(row));
419
420                match &self.key_output_indices {
421                    Some(key_output_indices) => {
422                        let result_row_in_key =
423                            pk.project(&self.output_row_in_key_indices).into_owned_row();
424                        let mut result_row_vec = vec![];
425                        for idx in &self.output_indices {
426                            if let Some(epoch_idx) = self.epoch_idx
427                                && *idx == epoch_idx
428                            {
429                                let epoch = Epoch::from(epoch);
430                                result_row_vec
431                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
432                            } else if self.value_output_indices.contains(idx) {
433                                let item_position_in_value_indices = &self
434                                    .value_output_indices
435                                    .iter()
436                                    .position(|p| idx == p)
437                                    .unwrap();
438                                result_row_vec.push(
439                                    result_row_in_value
440                                        .datum_at(*item_position_in_value_indices)
441                                        .to_owned_datum(),
442                                );
443                            } else {
444                                let item_position_in_pk_indices =
445                                    key_output_indices.iter().position(|p| idx == p).unwrap();
446                                result_row_vec.push(
447                                    result_row_in_key
448                                        .datum_at(item_position_in_pk_indices)
449                                        .to_owned_datum(),
450                                );
451                            }
452                        }
453                        let result_row = OwnedRow::new(result_row_vec);
454                        Ok(Some(result_row))
455                    }
456                    None => match &self.epoch_idx {
457                        Some(epoch_idx) => {
458                            let mut result_row_vec = vec![];
459                            for idx in &self.output_indices {
460                                if idx == epoch_idx {
461                                    let epoch = Epoch::from(epoch);
462                                    result_row_vec.push(risingwave_common::types::Datum::from(
463                                        epoch.as_scalar(),
464                                    ));
465                                } else {
466                                    let item_position_in_value_indices = &self
467                                        .value_output_indices
468                                        .iter()
469                                        .position(|p| idx == p)
470                                        .unwrap();
471                                    result_row_vec.push(
472                                        result_row_in_value
473                                            .datum_at(*item_position_in_value_indices)
474                                            .to_owned_datum(),
475                                    );
476                                }
477                            }
478                            let result_row = OwnedRow::new(result_row_vec);
479                            Ok(Some(result_row))
480                        }
481                        None => Ok(Some(result_row_in_value.into_owned_row())),
482                    },
483                }
484            }
485            _ => Ok(None),
486        }
487    }
488
489    /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
490    #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
491    pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
492        self.distribution.update_vnode_bitmap(new_vnodes)
493    }
494}
495
496/// The row iterator of the storage table.
497/// The wrapper of stream item `StorageResult<OwnedRow>` if pk is not persisted.
498impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
499    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
500        self.next().await.transpose()
501    }
502}
503
504mod merge_vnode_stream {
505
506    use bytes::Bytes;
507    use futures::{Stream, StreamExt, TryStreamExt};
508    use risingwave_hummock_sdk::key::TableKey;
509
510    use crate::error::StorageResult;
511    use crate::table::KeyedRow;
512    use crate::table::merge_sort::{NodePeek, merge_sort};
513
514    pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
515        Single(RowSt),
516        Unordered(Vec<RowSt>),
517        Ordered(Vec<KeyedRowSt>),
518    }
519
520    pub(super) type MergedVnodeStream<
521        R: Send,
522        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
523        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
524    >
525    where
526        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
527    = impl Stream<Item = StorageResult<R>> + Send;
528
529    pub(super) type SortKeyType = Bytes; // TODO: may use Vec
530
531    #[define_opaque(MergedVnodeStream)]
532    pub(super) fn merge_stream<
533        R: Send,
534        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
535        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
536    >(
537        stream: VnodeStreamType<RowSt, KeyedRowSt>,
538    ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
539    where
540        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
541    {
542        #[auto_enums::auto_enum(futures03::Stream)]
543        match stream {
544            VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
545            VnodeStreamType::Unordered(streams) => futures::stream::iter(
546                streams
547                    .into_iter()
548                    .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
549            )
550            .flatten_unordered(1024),
551            VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
552                Box::pin(stream.map_ok(|(key, row)| KeyedRow {
553                    vnode_prefixed_key: TableKey(key),
554                    row,
555                }))
556            }))
557            .map_ok(|keyed_row| keyed_row.row),
558        }
559    }
560}
561
562use merge_vnode_stream::*;
563
564async fn build_vnode_stream<
565    R: Send,
566    RowSt: Stream<Item = StorageResult<((), R)>> + Send,
567    KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
568    RowStFut: Future<Output = StorageResult<RowSt>>,
569    KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
570>(
571    row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
572    keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
573    vnodes: &[VirtualNode],
574    ordered: bool,
575) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
576where
577    KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
578{
579    let stream = match vnodes {
580        [] => unreachable!(),
581        [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
582        // Concat all iterators if not to preserve order.
583        vnodes if !ordered => VnodeStreamType::Unordered(
584            try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
585        ),
586        // Merge all iterators if to preserve order.
587        vnodes => VnodeStreamType::Ordered(
588            try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
589        ),
590    };
591    Ok(merge_stream(stream))
592}
593
594/// Iterators
595impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
596    /// Get multiple stream item `StorageResult<OwnedRow>` based on the specified vnodes of this table with
597    /// `vnode_hint`, and merge or concat them by given `ordered`.
598    async fn iter_with_encoded_key_range(
599        &self,
600        prefix_hint: Option<Bytes>,
601        (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
602        wait_epoch: HummockReadEpoch,
603        vnode_hint: Option<VirtualNode>,
604        ordered: bool,
605        prefetch_options: PrefetchOptions,
606    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
607    {
608        let vnodes = match vnode_hint {
609            // If `vnode_hint` is set, we can only access this single vnode.
610            Some(vnode) => {
611                assert!(
612                    self.distribution.vnodes().is_set(vnode.to_index()),
613                    "vnode unset: {:?}, distribution: {:?}",
614                    vnode,
615                    self.distribution
616                );
617                vec![vnode]
618            }
619            // Otherwise, we need to access all vnodes of this table.
620            None => self.distribution.vnodes().iter_vnodes().collect_vec(),
621        };
622
623        let read_snapshot = self
624            .store
625            .new_read_snapshot(
626                wait_epoch,
627                NewReadSnapshotOptions {
628                    table_id: self.table_id,
629                },
630            )
631            .await?;
632
633        build_vnode_stream(
634            |vnode| {
635                self.iter_vnode_with_encoded_key_range(
636                    &read_snapshot,
637                    prefix_hint.clone(),
638                    (start_bound.as_ref(), end_bound.as_ref()),
639                    vnode,
640                    prefetch_options,
641                )
642            },
643            |vnode| {
644                self.iter_vnode_with_encoded_key_range(
645                    &read_snapshot,
646                    prefix_hint.clone(),
647                    (start_bound.as_ref(), end_bound.as_ref()),
648                    vnode,
649                    prefetch_options,
650                )
651            },
652            &vnodes,
653            ordered,
654        )
655        .await
656    }
657
658    async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
659        &self,
660        read_snapshot: &S::ReadSnapshot,
661        prefix_hint: Option<Bytes>,
662        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
663        vnode: VirtualNode,
664        prefetch_options: PrefetchOptions,
665    ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
666    {
667        let cache_policy = match &encoded_key_range {
668            // To prevent unbounded range scan queries from polluting the block cache, use the
669            // low priority fill policy.
670            (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
671            _ => CachePolicy::Fill(Hint::Normal),
672        };
673
674        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
675
676        {
677            let prefix_hint = prefix_hint.clone();
678            {
679                let read_options = ReadOptions {
680                    prefix_hint,
681                    retention_seconds: self.table_option.retention_seconds,
682                    prefetch_options,
683                    cache_policy,
684                };
685                let pk_serializer = match self.output_row_in_key_indices.is_empty() {
686                    true => None,
687                    false => Some(Arc::new(self.pk_serializer.clone())),
688                };
689                let iter = BatchTableInnerIterInner::new(
690                    read_snapshot,
691                    self.mapping.clone(),
692                    self.epoch_idx,
693                    pk_serializer,
694                    self.output_indices.clone(),
695                    self.key_output_indices.clone(),
696                    self.value_output_indices.clone(),
697                    self.output_row_in_key_indices.clone(),
698                    self.row_serde.clone(),
699                    table_key_range,
700                    read_options,
701                )
702                .await?
703                .into_stream::<K>();
704                Ok(iter)
705            }
706        }
707    }
708
709    // TODO: directly use `prefixed_range`.
710    fn serialize_pk_bound(
711        &self,
712        pk_prefix: impl Row,
713        range_bound: Bound<&OwnedRow>,
714        is_start_bound: bool,
715    ) -> Bound<Bytes> {
716        match range_bound {
717            Included(k) => {
718                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
719                let key = pk_prefix.chain(k);
720                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
721                if is_start_bound {
722                    Included(serialized_key)
723                } else {
724                    // Should use excluded next key for end bound.
725                    // Otherwise keys starting with the bound is not included.
726                    end_bound_of_prefix(&serialized_key)
727                }
728            }
729            Excluded(k) => {
730                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
731                let key = pk_prefix.chain(k);
732                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
733                if is_start_bound {
734                    // Storage doesn't support excluded begin key yet, so transform it to
735                    // included.
736                    // We always serialize a u8 for null of datum which is not equal to '\xff',
737                    // so we can assert that the next_key would never be empty.
738                    let next_serialized_key = next_key(&serialized_key);
739                    assert!(!next_serialized_key.is_empty());
740                    Included(Bytes::from(next_serialized_key))
741                } else {
742                    Excluded(serialized_key)
743                }
744            }
745            Unbounded => {
746                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
747                let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
748                if pk_prefix.is_empty() {
749                    Unbounded
750                } else if is_start_bound {
751                    Included(serialized_pk_prefix)
752                } else {
753                    end_bound_of_prefix(&serialized_pk_prefix)
754                }
755            }
756        }
757    }
758
759    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
760    async fn iter_with_pk_bounds(
761        &self,
762        epoch: HummockReadEpoch,
763        pk_prefix: impl Row,
764        range_bounds: impl RangeBounds<OwnedRow>,
765        ordered: bool,
766        prefetch_options: PrefetchOptions,
767    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
768        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
769        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
770        assert!(pk_prefix.len() <= self.pk_indices.len());
771        let pk_prefix_indices = (0..pk_prefix.len())
772            .map(|index| self.pk_indices[index])
773            .collect_vec();
774
775        let prefix_hint = if self.read_prefix_len_hint != 0
776            && self.read_prefix_len_hint <= pk_prefix.len()
777        {
778            let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
779                start_key
780            } else {
781                unreachable!()
782            };
783            let prefix_len = self
784                .pk_serializer
785                .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
786            Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
787        } else {
788            trace!(
789                "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?}  pk_prefix_indices {:?}",
790                self.table_id, pk_prefix, pk_prefix_indices
791            );
792            None
793        };
794
795        trace!(
796            "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?}  pk_prefix_indices {:?}",
797            self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
798        );
799
800        self.iter_with_encoded_key_range(
801            prefix_hint,
802            (start_key, end_key),
803            epoch,
804            self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
805            ordered,
806            prefetch_options,
807        )
808        .await
809    }
810
811    // Construct a stream of (columns, row_count) from a row stream
812    #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
813    async fn convert_row_stream_to_array_vec_stream(
814        iter: impl Stream<Item = StorageResult<OwnedRow>>,
815        schema: Schema,
816        chunk_size: usize,
817    ) {
818        use futures::{TryStreamExt, pin_mut};
819        use risingwave_common::util::iter_util::ZipEqFast;
820
821        pin_mut!(iter);
822
823        let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
824        let mut row_count = 0;
825
826        while let Some(row) = iter.try_next().await? {
827            row_count += 1;
828            // Uses ArrayBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner
829            let builders_ref =
830                builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
831            for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
832                builder.append(datum);
833            }
834            if row_count == chunk_size {
835                let columns: Vec<_> = builders
836                    .take()
837                    .unwrap()
838                    .into_iter()
839                    .map(|builder| builder.finish().into())
840                    .collect();
841                yield (columns, row_count);
842                assert!(builders.is_none());
843                row_count = 0;
844            }
845        }
846
847        if let Some(builders) = builders {
848            assert_gt!(row_count, 0);
849            // yield the last chunk if any
850            let columns: Vec<_> = builders
851                .into_iter()
852                .map(|builder| builder.finish().into())
853                .collect();
854            yield (columns, row_count);
855        }
856    }
857
858    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
859    /// Returns a stream of chunks of columns with the provided `chunk_size`
860    async fn chunk_iter_with_pk_bounds(
861        &self,
862        epoch: HummockReadEpoch,
863        pk_prefix: impl Row,
864        range_bounds: impl RangeBounds<OwnedRow>,
865        ordered: bool,
866        chunk_size: usize,
867        prefetch_options: PrefetchOptions,
868    ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
869        let iter = self
870            .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
871            .await?;
872
873        Ok(Self::convert_row_stream_to_array_vec_stream(
874            iter,
875            self.schema.clone(),
876            chunk_size,
877        ))
878    }
879
880    /// Construct a stream item `StorageResult<OwnedRow>` for batch executors.
881    /// Differs from the streaming one, this iterator will wait for the epoch before iteration
882    pub async fn batch_iter_with_pk_bounds(
883        &self,
884        epoch: HummockReadEpoch,
885        pk_prefix: impl Row,
886        range_bounds: impl RangeBounds<OwnedRow>,
887        ordered: bool,
888        prefetch_options: PrefetchOptions,
889    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
890        self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
891            .await
892    }
893
894    // The returned iterator will iterate data from a snapshot corresponding to the given `epoch`.
895    pub async fn batch_iter(
896        &self,
897        epoch: HummockReadEpoch,
898        ordered: bool,
899        prefetch_options: PrefetchOptions,
900    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
901        self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
902            .await
903    }
904
905    pub async fn batch_iter_vnode(
906        &self,
907        epoch: HummockReadEpoch,
908        start_pk: Option<&OwnedRow>,
909        vnode: VirtualNode,
910        prefetch_options: PrefetchOptions,
911    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
912    {
913        let start_bound = if let Some(start_pk) = start_pk {
914            let mut bytes = BytesMut::new();
915            self.pk_serializer.serialize(start_pk, &mut bytes);
916            let bytes = bytes.freeze();
917            Included(bytes)
918        } else {
919            Unbounded
920        };
921        let read_snapshot = self
922            .store
923            .new_read_snapshot(
924                epoch,
925                NewReadSnapshotOptions {
926                    table_id: self.table_id,
927                },
928            )
929            .await?;
930        Ok(self
931            .iter_vnode_with_encoded_key_range::<()>(
932                &read_snapshot,
933                None,
934                (start_bound.as_ref(), Unbounded),
935                vnode,
936                prefetch_options,
937            )
938            .await?
939            .map_ok(|(_, row)| row))
940    }
941
942    pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
943        self.store
944            .next_epoch(
945                epoch,
946                NextEpochOptions {
947                    table_id: self.table_id,
948                },
949            )
950            .await
951    }
952
953    pub async fn batch_iter_vnode_log(
954        &self,
955        start_epoch: u64,
956        end_epoch: HummockReadEpoch,
957        start_pk: Option<&OwnedRow>,
958        vnode: VirtualNode,
959    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
960    {
961        let start_bound = if let Some(start_pk) = start_pk {
962            let mut bytes = BytesMut::new();
963            self.pk_serializer.serialize(start_pk, &mut bytes);
964            let bytes = bytes.freeze();
965            Included(bytes)
966        } else {
967            Unbounded
968        };
969        let stream = self
970            .batch_iter_log_inner::<()>(
971                start_epoch,
972                end_epoch,
973                (start_bound.as_ref(), Unbounded),
974                vnode,
975            )
976            .await?;
977        Ok(stream.map_ok(|(_, row)| row))
978    }
979
980    pub async fn batch_iter_log_with_pk_bounds(
981        &self,
982        start_epoch: u64,
983        end_epoch: HummockReadEpoch,
984        ordered: bool,
985        range_bounds: impl RangeBounds<OwnedRow>,
986        pk_prefix: impl Row,
987    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
988        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
989        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
990        let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
991        build_vnode_stream(
992            |vnode| {
993                self.batch_iter_log_inner(
994                    start_epoch,
995                    end_epoch,
996                    (start_key.as_ref(), end_key.as_ref()),
997                    vnode,
998                )
999            },
1000            |vnode| {
1001                self.batch_iter_log_inner(
1002                    start_epoch,
1003                    end_epoch,
1004                    (start_key.as_ref(), end_key.as_ref()),
1005                    vnode,
1006                )
1007            },
1008            &vnodes,
1009            ordered,
1010        )
1011        .await
1012    }
1013
1014    async fn batch_iter_log_inner<K: CopyFromSlice>(
1015        &self,
1016        start_epoch: u64,
1017        end_epoch: HummockReadEpoch,
1018        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1019        vnode: VirtualNode,
1020    ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1021    {
1022        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1023        let read_options = ReadLogOptions {
1024            table_id: self.table_id,
1025        };
1026        let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1027            &self.store,
1028            self.mapping.clone(),
1029            self.row_serde.clone(),
1030            table_key_range,
1031            read_options,
1032            start_epoch,
1033            end_epoch,
1034        )
1035        .await?
1036        .into_stream::<K>();
1037
1038        Ok(iter)
1039    }
1040
1041    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
1042    /// Returns a stream of `DataChunk` with the provided `chunk_size`
1043    pub async fn batch_chunk_iter_with_pk_bounds(
1044        &self,
1045        epoch: HummockReadEpoch,
1046        pk_prefix: impl Row,
1047        range_bounds: impl RangeBounds<OwnedRow>,
1048        ordered: bool,
1049        chunk_size: usize,
1050        prefetch_options: PrefetchOptions,
1051    ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1052        let iter = self
1053            .chunk_iter_with_pk_bounds(
1054                epoch,
1055                pk_prefix,
1056                range_bounds,
1057                ordered,
1058                chunk_size,
1059                prefetch_options,
1060            )
1061            .await?;
1062
1063        Ok(iter.map(|item| {
1064            let (columns, row_count) = item?;
1065            Ok(DataChunk::new(columns, row_count))
1066        }))
1067    }
1068}
1069
1070/// [`BatchTableInnerIterInner`] iterates on the storage table.
1071struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1072    /// An iterator that returns raw bytes from storage.
1073    iter: SI,
1074
1075    mapping: Arc<ColumnMapping>,
1076
1077    /// The index of system column `_rw_timestamp` in the output columns.
1078    epoch_idx: Option<usize>,
1079
1080    row_deserializer: Arc<SD>,
1081
1082    /// Used for serializing and deserializing the primary key.
1083    pk_serializer: Option<Arc<OrderedRowSerde>>,
1084
1085    output_indices: Vec<usize>,
1086
1087    /// the key part of `output_indices`.
1088    key_output_indices: Option<Vec<usize>>,
1089
1090    /// the value part of `output_indices`.
1091    value_output_indices: Vec<usize>,
1092
1093    /// used for deserializing key part of output row from pk.
1094    output_row_in_key_indices: Vec<usize>,
1095}
1096
1097impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1098    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1099    #[allow(clippy::too_many_arguments)]
1100    async fn new<S>(
1101        store: &S,
1102        mapping: Arc<ColumnMapping>,
1103        epoch_idx: Option<usize>,
1104        pk_serializer: Option<Arc<OrderedRowSerde>>,
1105        output_indices: Vec<usize>,
1106        key_output_indices: Option<Vec<usize>>,
1107        value_output_indices: Vec<usize>,
1108        output_row_in_key_indices: Vec<usize>,
1109        row_deserializer: Arc<SD>,
1110        table_key_range: TableKeyRange,
1111        read_options: ReadOptions,
1112    ) -> StorageResult<Self>
1113    where
1114        S: StateStoreRead<Iter = SI>,
1115    {
1116        let iter = store.iter(table_key_range, read_options).await?;
1117        let iter = Self {
1118            iter,
1119            mapping,
1120            epoch_idx,
1121            row_deserializer,
1122            pk_serializer,
1123            output_indices,
1124            key_output_indices,
1125            value_output_indices,
1126            output_row_in_key_indices,
1127        };
1128        Ok(iter)
1129    }
1130
1131    /// Yield a row with its primary key.
1132    #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1133    async fn into_stream<K: CopyFromSlice>(mut self) {
1134        while let Some((k, v)) = self
1135            .iter
1136            .try_next()
1137            .instrument_await("storage_table_iter_next".verbose())
1138            .await?
1139        {
1140            let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1141            let row = self.row_deserializer.deserialize(value)?;
1142            let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1143            let row = match &self.key_output_indices {
1144                Some(key_output_indices) => {
1145                    let result_row_in_key = match self.pk_serializer.clone() {
1146                        Some(pk_serializer) => {
1147                            let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1148
1149                            pk.project(&self.output_row_in_key_indices).into_owned_row()
1150                        }
1151                        None => OwnedRow::empty(),
1152                    };
1153
1154                    let mut result_row_vec = vec![];
1155                    for idx in &self.output_indices {
1156                        if let Some(epoch_idx) = self.epoch_idx
1157                            && *idx == epoch_idx
1158                        {
1159                            let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1160                            result_row_vec
1161                                .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1162                        } else if self.value_output_indices.contains(idx) {
1163                            let item_position_in_value_indices = &self
1164                                .value_output_indices
1165                                .iter()
1166                                .position(|p| idx == p)
1167                                .unwrap();
1168                            result_row_vec.push(
1169                                result_row_in_value
1170                                    .datum_at(*item_position_in_value_indices)
1171                                    .to_owned_datum(),
1172                            );
1173                        } else {
1174                            let item_position_in_pk_indices =
1175                                key_output_indices.iter().position(|p| idx == p).unwrap();
1176                            result_row_vec.push(
1177                                result_row_in_key
1178                                    .datum_at(item_position_in_pk_indices)
1179                                    .to_owned_datum(),
1180                            );
1181                        }
1182                    }
1183                    OwnedRow::new(result_row_vec)
1184                }
1185                None => match &self.epoch_idx {
1186                    Some(epoch_idx) => {
1187                        let mut result_row_vec = vec![];
1188                        for idx in &self.output_indices {
1189                            if idx == epoch_idx {
1190                                let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1191                                result_row_vec
1192                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1193                            } else {
1194                                let item_position_in_value_indices = &self
1195                                    .value_output_indices
1196                                    .iter()
1197                                    .position(|p| idx == p)
1198                                    .unwrap();
1199                                result_row_vec.push(
1200                                    result_row_in_value
1201                                        .datum_at(*item_position_in_value_indices)
1202                                        .to_owned_datum(),
1203                                );
1204                            }
1205                        }
1206                        OwnedRow::new(result_row_vec)
1207                    }
1208                    None => result_row_in_value.into_owned_row(),
1209                },
1210            };
1211            yield (K::copy_from_slice(table_key.as_ref()), row);
1212        }
1213    }
1214}
1215
1216/// [`BatchTableInnerIterLogInner`] iterates on the storage table.
1217struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1218    /// An iterator that returns raw bytes from storage.
1219    iter: S::ChangeLogIter,
1220
1221    mapping: Arc<ColumnMapping>,
1222
1223    row_deserializer: Arc<SD>,
1224}
1225
1226impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1227    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1228    #[allow(clippy::too_many_arguments)]
1229    async fn new(
1230        store: &S,
1231        mapping: Arc<ColumnMapping>,
1232        row_deserializer: Arc<SD>,
1233        table_key_range: TableKeyRange,
1234        read_options: ReadLogOptions,
1235        start_epoch: u64,
1236        end_epoch: HummockReadEpoch,
1237    ) -> StorageResult<Self> {
1238        store
1239            .try_wait_epoch(
1240                end_epoch,
1241                TryWaitEpochOptions {
1242                    table_id: read_options.table_id,
1243                },
1244            )
1245            .await?;
1246        let iter = store
1247            .iter_log(
1248                (start_epoch, end_epoch.get_epoch()),
1249                table_key_range,
1250                read_options,
1251            )
1252            .await?;
1253        let iter = Self {
1254            iter,
1255            mapping,
1256            row_deserializer,
1257        };
1258        Ok(iter)
1259    }
1260
1261    /// Yield a row with its primary key.
1262    fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1263        self.iter.into_stream(move |(table_key, value)| {
1264            value
1265                .try_map(|value| {
1266                    let full_row = self.row_deserializer.deserialize(value)?;
1267                    let row = self
1268                        .mapping
1269                        .project(OwnedRow::new(full_row))
1270                        .into_owned_row();
1271                    Ok(row)
1272                })
1273                .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1274        })
1275    }
1276}