risingwave_storage/table/batch_table/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::CacheHint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41    CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45
46use crate::StateStore;
47use crate::error::{StorageError, StorageResult};
48use crate::hummock::CachePolicy;
49use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
50use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
51use crate::row_serde::{ColumnMapping, find_columns_by_ids};
52use crate::store::{
53    NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
54    StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
55};
56use crate::table::merge_sort::NodePeek;
57use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
58
59/// [`BatchTableInner`] is the interface accessing relational data in KV(`StateStore`) with
60/// row-based encoding format, and is used in batch mode.
61#[derive(Clone)]
62pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
63    /// Id for this table.
64    table_id: TableId,
65
66    /// State store backend.
67    store: S,
68
69    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
70    /// `RowSeqScanExecutor`.
71    schema: Schema,
72
73    /// Used for serializing and deserializing the primary key.
74    pk_serializer: OrderedRowSerde,
75
76    output_indices: Vec<usize>,
77
78    /// the key part of `output_indices`.
79    key_output_indices: Option<Vec<usize>>,
80
81    /// the value part of `output_indices`.
82    value_output_indices: Vec<usize>,
83
84    /// used for deserializing key part of output row from pk.
85    output_row_in_key_indices: Vec<usize>,
86
87    /// Mapping from column id to column index for deserializing the row.
88    mapping: Arc<ColumnMapping>,
89
90    /// The index of system column `_rw_timestamp` in the output columns.
91    epoch_idx: Option<usize>,
92
93    /// Row deserializer to deserialize the value in storage to a row.
94    /// The row can be either complete or partial, depending on whether the row encoding is versioned.
95    row_serde: Arc<SD>,
96
97    /// Indices of primary key.
98    /// Note that the index is based on the all columns of the table, instead of the output ones.
99    // FIXME: revisit constructions and usages.
100    pk_indices: Vec<usize>,
101
102    distribution: TableDistribution,
103
104    /// Used for catalog `table_properties`
105    table_option: TableOption,
106
107    read_prefix_len_hint: usize,
108}
109
110/// `BatchTable` will use [`EitherSerde`] as default so that we can support both versioned and
111/// non-versioned tables with the same type.
112pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
113
114impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("BatchTableInner").finish_non_exhaustive()
117    }
118}
119
120// init
121impl<S: StateStore> BatchTableInner<S, EitherSerde> {
122    /// Create a  [`BatchTableInner`] given a complete set of `columns` and a partial
123    /// set of `output_column_ids`.
124    /// When reading from the storage table,
125    /// the chunks or rows will only contain columns with the given ids (`output_column_ids`).
126    /// They will in the same order as the given `output_column_ids`.
127    ///
128    /// NOTE(kwannoel): The `output_column_ids` here may be slightly different
129    /// from those supplied to associated executors.
130    /// These `output_column_ids` may have `pk` appended, since they will be needed to scan from
131    /// storage. The associated executors may not have these `pk` fields.
132    pub fn new_partial(
133        store: S,
134        output_column_ids: Vec<ColumnId>,
135        vnodes: Option<Arc<Bitmap>>,
136        table_desc: &StorageTableDesc,
137    ) -> Self {
138        let table_id = TableId {
139            table_id: table_desc.table_id,
140        };
141        let column_descs = table_desc
142            .columns
143            .iter()
144            .map(ColumnDesc::from)
145            .collect_vec();
146        let order_types: Vec<OrderType> = table_desc
147            .pk
148            .iter()
149            .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150            .collect();
151
152        let pk_indices = table_desc
153            .pk
154            .iter()
155            .map(|k| k.column_index as usize)
156            .collect_vec();
157
158        let table_option = TableOption {
159            retention_seconds: table_desc.retention_seconds,
160        };
161        let value_indices = table_desc
162            .get_value_indices()
163            .iter()
164            .map(|&k| k as usize)
165            .collect_vec();
166        let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167        let versioned = table_desc.versioned;
168        let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170        Self::new_inner(
171            store,
172            table_id,
173            column_descs,
174            output_column_ids,
175            order_types,
176            pk_indices,
177            distribution,
178            table_option,
179            value_indices,
180            prefix_hint_len,
181            versioned,
182        )
183    }
184
185    pub fn for_test_with_partial_columns(
186        store: S,
187        table_id: TableId,
188        columns: Vec<ColumnDesc>,
189        output_column_ids: Vec<ColumnId>,
190        order_types: Vec<OrderType>,
191        pk_indices: Vec<usize>,
192        value_indices: Vec<usize>,
193    ) -> Self {
194        Self::new_inner(
195            store,
196            table_id,
197            columns,
198            output_column_ids,
199            order_types,
200            pk_indices,
201            TableDistribution::singleton(),
202            Default::default(),
203            value_indices,
204            0,
205            false,
206        )
207    }
208
209    pub fn for_test(
210        store: S,
211        table_id: TableId,
212        columns: Vec<ColumnDesc>,
213        order_types: Vec<OrderType>,
214        pk_indices: Vec<usize>,
215        value_indices: Vec<usize>,
216    ) -> Self {
217        let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218        Self::for_test_with_partial_columns(
219            store,
220            table_id,
221            columns,
222            output_column_ids,
223            order_types,
224            pk_indices,
225            value_indices,
226        )
227    }
228
229    #[allow(clippy::too_many_arguments)]
230    fn new_inner(
231        store: S,
232        table_id: TableId,
233        table_columns: Vec<ColumnDesc>,
234        output_column_ids: Vec<ColumnId>,
235        order_types: Vec<OrderType>,
236        pk_indices: Vec<usize>,
237        distribution: TableDistribution,
238        table_option: TableOption,
239        value_indices: Vec<usize>,
240        read_prefix_len_hint: usize,
241        versioned: bool,
242    ) -> Self {
243        assert_eq!(order_types.len(), pk_indices.len());
244
245        let (output_columns, output_indices) =
246            find_columns_by_ids(&table_columns, &output_column_ids);
247
248        let mut value_output_indices = vec![];
249        let mut key_output_indices = vec![];
250        // system column currently only contains `_rw_timestamp`
251        let mut epoch_idx = None;
252
253        for idx in &output_indices {
254            if value_indices.contains(idx) {
255                value_output_indices.push(*idx);
256            } else if pk_indices.contains(idx) {
257                key_output_indices.push(*idx);
258            } else {
259                assert!(epoch_idx.is_none());
260                epoch_idx = Some(*idx);
261            }
262        }
263
264        let output_row_in_key_indices = key_output_indices
265            .iter()
266            .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267            .collect_vec();
268        let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270        let pk_data_types = pk_indices
271            .iter()
272            .map(|i| table_columns[*i].data_type.clone())
273            .collect();
274        let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275        let (row_serde, mapping) = {
276            if versioned {
277                let value_output_indices_dedup = value_output_indices
278                    .iter()
279                    .unique()
280                    .copied()
281                    .collect::<Vec<_>>();
282                let output_row_in_value_output_indices_dedup = value_output_indices
283                    .iter()
284                    .map(|&di| {
285                        value_output_indices_dedup
286                            .iter()
287                            .position(|&pi| di == pi)
288                            .unwrap()
289                    })
290                    .collect_vec();
291                let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292                let serde =
293                    ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294                (serde.into(), mapping)
295            } else {
296                let output_row_in_value_indices = value_output_indices
297                    .iter()
298                    .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299                    .collect_vec();
300                let mapping = ColumnMapping::new(output_row_in_value_indices);
301                let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302                (serde.into(), mapping)
303            }
304        };
305
306        let key_output_indices = match key_output_indices.is_empty() {
307            true => None,
308            false => Some(key_output_indices),
309        };
310        Self {
311            table_id,
312            store,
313            schema,
314            pk_serializer,
315            output_indices,
316            key_output_indices,
317            value_output_indices,
318            output_row_in_key_indices,
319            mapping: Arc::new(mapping),
320            epoch_idx,
321            row_serde: Arc::new(row_serde),
322            pk_indices,
323            distribution,
324            table_option,
325            read_prefix_len_hint,
326        }
327    }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331    pub fn pk_serializer(&self) -> &OrderedRowSerde {
332        &self.pk_serializer
333    }
334
335    pub fn schema(&self) -> &Schema {
336        &self.schema
337    }
338
339    pub fn pk_indices(&self) -> &[usize] {
340        &self.pk_indices
341    }
342
343    pub fn output_indices(&self) -> &[usize] {
344        &self.output_indices
345    }
346
347    /// Get the indices of the primary key columns in the output columns.
348    ///
349    /// Returns `None` if any of the primary key columns is not in the output columns.
350    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351        self.pk_indices
352            .iter()
353            .map(|&i| self.output_indices.iter().position(|&j| i == j))
354            .collect()
355    }
356
357    pub fn table_id(&self) -> TableId {
358        self.table_id
359    }
360
361    pub fn vnodes(&self) -> &Arc<Bitmap> {
362        self.distribution.vnodes()
363    }
364}
365/// Point get
366impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367    /// Get a single row by point get
368    pub async fn get_row(
369        &self,
370        pk: impl Row,
371        wait_epoch: HummockReadEpoch,
372    ) -> StorageResult<Option<OwnedRow>> {
373        let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
374        let read_committed = wait_epoch.is_read_committed();
375        self.store
376            .try_wait_epoch(
377                wait_epoch,
378                TryWaitEpochOptions {
379                    table_id: self.table_id,
380                },
381            )
382            .await?;
383        let serialized_pk = serialize_pk_with_vnode(
384            &pk,
385            &self.pk_serializer,
386            self.distribution.compute_vnode_by_pk(&pk),
387        );
388        assert!(pk.len() <= self.pk_indices.len());
389
390        let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
391        {
392            Some(serialized_pk.slice(VirtualNode::SIZE..))
393        } else {
394            None
395        };
396
397        let read_options = ReadOptions {
398            prefix_hint,
399            retention_seconds: self.table_option.retention_seconds,
400            table_id: self.table_id,
401            read_version_from_backup: read_backup,
402            read_committed,
403            cache_policy: CachePolicy::Fill(CacheHint::Normal),
404            ..Default::default()
405        };
406        let read_snapshot = self
407            .store
408            .new_read_snapshot(
409                wait_epoch,
410                NewReadSnapshotOptions {
411                    table_id: self.table_id,
412                },
413            )
414            .await?;
415        match read_snapshot
416            .get_keyed_row(serialized_pk, read_options)
417            .await?
418        {
419            Some((full_key, value)) => {
420                let row = self.row_serde.deserialize(&value)?;
421                let result_row_in_value = self.mapping.project(OwnedRow::new(row));
422
423                match &self.key_output_indices {
424                    Some(key_output_indices) => {
425                        let result_row_in_key =
426                            pk.project(&self.output_row_in_key_indices).into_owned_row();
427                        let mut result_row_vec = vec![];
428                        for idx in &self.output_indices {
429                            if let Some(epoch_idx) = self.epoch_idx
430                                && *idx == epoch_idx
431                            {
432                                let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
433                                result_row_vec
434                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
435                            } else if self.value_output_indices.contains(idx) {
436                                let item_position_in_value_indices = &self
437                                    .value_output_indices
438                                    .iter()
439                                    .position(|p| idx == p)
440                                    .unwrap();
441                                result_row_vec.push(
442                                    result_row_in_value
443                                        .datum_at(*item_position_in_value_indices)
444                                        .to_owned_datum(),
445                                );
446                            } else {
447                                let item_position_in_pk_indices =
448                                    key_output_indices.iter().position(|p| idx == p).unwrap();
449                                result_row_vec.push(
450                                    result_row_in_key
451                                        .datum_at(item_position_in_pk_indices)
452                                        .to_owned_datum(),
453                                );
454                            }
455                        }
456                        let result_row = OwnedRow::new(result_row_vec);
457                        Ok(Some(result_row))
458                    }
459                    None => match &self.epoch_idx {
460                        Some(epoch_idx) => {
461                            let mut result_row_vec = vec![];
462                            for idx in &self.output_indices {
463                                if idx == epoch_idx {
464                                    let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
465                                    result_row_vec.push(risingwave_common::types::Datum::from(
466                                        epoch.as_scalar(),
467                                    ));
468                                } else {
469                                    let item_position_in_value_indices = &self
470                                        .value_output_indices
471                                        .iter()
472                                        .position(|p| idx == p)
473                                        .unwrap();
474                                    result_row_vec.push(
475                                        result_row_in_value
476                                            .datum_at(*item_position_in_value_indices)
477                                            .to_owned_datum(),
478                                    );
479                                }
480                            }
481                            let result_row = OwnedRow::new(result_row_vec);
482                            Ok(Some(result_row))
483                        }
484                        None => Ok(Some(result_row_in_value.into_owned_row())),
485                    },
486                }
487            }
488            _ => Ok(None),
489        }
490    }
491
492    /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
493    #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
494    pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
495        self.distribution.update_vnode_bitmap(new_vnodes)
496    }
497}
498
499/// The row iterator of the storage table.
500/// The wrapper of stream item `StorageResult<OwnedRow>` if pk is not persisted.
501impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
502    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
503        self.next().await.transpose()
504    }
505}
506
507mod merge_vnode_stream {
508
509    use bytes::Bytes;
510    use futures::{Stream, StreamExt, TryStreamExt};
511    use risingwave_hummock_sdk::key::TableKey;
512
513    use crate::error::StorageResult;
514    use crate::table::KeyedRow;
515    use crate::table::merge_sort::{NodePeek, merge_sort};
516
517    pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
518        Single(RowSt),
519        Unordered(Vec<RowSt>),
520        Ordered(Vec<KeyedRowSt>),
521    }
522
523    pub(super) type MergedVnodeStream<
524        R: Send,
525        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
526        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
527    >
528    where
529        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
530    = impl Stream<Item = StorageResult<R>> + Send;
531
532    pub(super) type SortKeyType = Bytes; // TODO: may use Vec
533
534    pub(super) fn merge_stream<
535        R: Send,
536        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
537        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
538    >(
539        stream: VnodeStreamType<RowSt, KeyedRowSt>,
540    ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
541    where
542        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
543    {
544        #[auto_enums::auto_enum(futures03::Stream)]
545        match stream {
546            VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
547            VnodeStreamType::Unordered(streams) => futures::stream::iter(
548                streams
549                    .into_iter()
550                    .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
551            )
552            .flatten_unordered(1024),
553            VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
554                Box::pin(stream.map_ok(|(key, row)| KeyedRow {
555                    vnode_prefixed_key: TableKey(key),
556                    row,
557                }))
558            }))
559            .map_ok(|keyed_row| keyed_row.row),
560        }
561    }
562}
563
564use merge_vnode_stream::*;
565
566async fn build_vnode_stream<
567    R: Send,
568    RowSt: Stream<Item = StorageResult<((), R)>> + Send,
569    KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
570    RowStFut: Future<Output = StorageResult<RowSt>>,
571    KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
572>(
573    row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
574    keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
575    vnodes: &[VirtualNode],
576    ordered: bool,
577) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
578where
579    KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
580{
581    let stream = match vnodes {
582        [] => unreachable!(),
583        [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
584        // Concat all iterators if not to preserve order.
585        vnodes if !ordered => VnodeStreamType::Unordered(
586            try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
587        ),
588        // Merge all iterators if to preserve order.
589        vnodes => VnodeStreamType::Ordered(
590            try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
591        ),
592    };
593    Ok(merge_stream(stream))
594}
595
596/// Iterators
597impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
598    /// Get multiple stream item `StorageResult<OwnedRow>` based on the specified vnodes of this table with
599    /// `vnode_hint`, and merge or concat them by given `ordered`.
600    async fn iter_with_encoded_key_range(
601        &self,
602        prefix_hint: Option<Bytes>,
603        (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
604        wait_epoch: HummockReadEpoch,
605        vnode_hint: Option<VirtualNode>,
606        ordered: bool,
607        prefetch_options: PrefetchOptions,
608    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
609    {
610        let vnodes = match vnode_hint {
611            // If `vnode_hint` is set, we can only access this single vnode.
612            Some(vnode) => {
613                assert!(
614                    self.distribution.vnodes().is_set(vnode.to_index()),
615                    "vnode unset: {:?}, distribution: {:?}",
616                    vnode,
617                    self.distribution
618                );
619                vec![vnode]
620            }
621            // Otherwise, we need to access all vnodes of this table.
622            None => self.distribution.vnodes().iter_vnodes().collect_vec(),
623        };
624
625        let read_snapshot = self
626            .store
627            .new_read_snapshot(
628                wait_epoch,
629                NewReadSnapshotOptions {
630                    table_id: self.table_id,
631                },
632            )
633            .await?;
634
635        build_vnode_stream(
636            |vnode| {
637                self.iter_vnode_with_encoded_key_range(
638                    &read_snapshot,
639                    prefix_hint.clone(),
640                    (start_bound.as_ref(), end_bound.as_ref()),
641                    wait_epoch,
642                    vnode,
643                    prefetch_options,
644                )
645            },
646            |vnode| {
647                self.iter_vnode_with_encoded_key_range(
648                    &read_snapshot,
649                    prefix_hint.clone(),
650                    (start_bound.as_ref(), end_bound.as_ref()),
651                    wait_epoch,
652                    vnode,
653                    prefetch_options,
654                )
655            },
656            &vnodes,
657            ordered,
658        )
659        .await
660    }
661
662    async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
663        &self,
664        read_snapshot: &S::ReadSnapshot,
665        prefix_hint: Option<Bytes>,
666        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
667        wait_epoch: HummockReadEpoch,
668        vnode: VirtualNode,
669        prefetch_options: PrefetchOptions,
670    ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
671    {
672        let cache_policy = match &encoded_key_range {
673            // To prevent unbounded range scan queries from polluting the block cache, use the
674            // low priority fill policy.
675            (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CacheHint::Low),
676            _ => CachePolicy::Fill(CacheHint::Normal),
677        };
678
679        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
680
681        {
682            let prefix_hint = prefix_hint.clone();
683            let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
684            let read_committed = wait_epoch.is_read_committed();
685            {
686                let read_options = ReadOptions {
687                    prefix_hint,
688                    retention_seconds: self.table_option.retention_seconds,
689                    table_id: self.table_id,
690                    read_version_from_backup: read_backup,
691                    read_committed,
692                    prefetch_options,
693                    cache_policy,
694                };
695                let pk_serializer = match self.output_row_in_key_indices.is_empty() {
696                    true => None,
697                    false => Some(Arc::new(self.pk_serializer.clone())),
698                };
699                let iter = BatchTableInnerIterInner::new(
700                    read_snapshot,
701                    self.mapping.clone(),
702                    self.epoch_idx,
703                    pk_serializer,
704                    self.output_indices.clone(),
705                    self.key_output_indices.clone(),
706                    self.value_output_indices.clone(),
707                    self.output_row_in_key_indices.clone(),
708                    self.row_serde.clone(),
709                    table_key_range,
710                    read_options,
711                )
712                .await?
713                .into_stream::<K>();
714                Ok(iter)
715            }
716        }
717    }
718
719    // TODO: directly use `prefixed_range`.
720    fn serialize_pk_bound(
721        &self,
722        pk_prefix: impl Row,
723        range_bound: Bound<&OwnedRow>,
724        is_start_bound: bool,
725    ) -> Bound<Bytes> {
726        match range_bound {
727            Included(k) => {
728                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
729                let key = pk_prefix.chain(k);
730                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
731                if is_start_bound {
732                    Included(serialized_key)
733                } else {
734                    // Should use excluded next key for end bound.
735                    // Otherwise keys starting with the bound is not included.
736                    end_bound_of_prefix(&serialized_key)
737                }
738            }
739            Excluded(k) => {
740                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
741                let key = pk_prefix.chain(k);
742                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
743                if is_start_bound {
744                    // Storage doesn't support excluded begin key yet, so transform it to
745                    // included.
746                    // We always serialize a u8 for null of datum which is not equal to '\xff',
747                    // so we can assert that the next_key would never be empty.
748                    let next_serialized_key = next_key(&serialized_key);
749                    assert!(!next_serialized_key.is_empty());
750                    Included(Bytes::from(next_serialized_key))
751                } else {
752                    Excluded(serialized_key)
753                }
754            }
755            Unbounded => {
756                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
757                let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
758                if pk_prefix.is_empty() {
759                    Unbounded
760                } else if is_start_bound {
761                    Included(serialized_pk_prefix)
762                } else {
763                    end_bound_of_prefix(&serialized_pk_prefix)
764                }
765            }
766        }
767    }
768
769    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
770    async fn iter_with_pk_bounds(
771        &self,
772        epoch: HummockReadEpoch,
773        pk_prefix: impl Row,
774        range_bounds: impl RangeBounds<OwnedRow>,
775        ordered: bool,
776        prefetch_options: PrefetchOptions,
777    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
778        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
779        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
780        assert!(pk_prefix.len() <= self.pk_indices.len());
781        let pk_prefix_indices = (0..pk_prefix.len())
782            .map(|index| self.pk_indices[index])
783            .collect_vec();
784
785        let prefix_hint = if self.read_prefix_len_hint != 0
786            && self.read_prefix_len_hint <= pk_prefix.len()
787        {
788            let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
789                start_key
790            } else {
791                unreachable!()
792            };
793            let prefix_len = self
794                .pk_serializer
795                .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
796            Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
797        } else {
798            trace!(
799                "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?}  pk_prefix_indices {:?}",
800                self.table_id, pk_prefix, pk_prefix_indices
801            );
802            None
803        };
804
805        trace!(
806            "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?}  pk_prefix_indices {:?}",
807            self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
808        );
809
810        self.iter_with_encoded_key_range(
811            prefix_hint,
812            (start_key, end_key),
813            epoch,
814            self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
815            ordered,
816            prefetch_options,
817        )
818        .await
819    }
820
821    // Construct a stream of (columns, row_count) from a row stream
822    #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
823    async fn convert_row_stream_to_array_vec_stream(
824        iter: impl Stream<Item = StorageResult<OwnedRow>>,
825        schema: Schema,
826        chunk_size: usize,
827    ) {
828        use futures::{TryStreamExt, pin_mut};
829        use risingwave_common::util::iter_util::ZipEqFast;
830
831        pin_mut!(iter);
832
833        let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
834        let mut row_count = 0;
835
836        while let Some(row) = iter.try_next().await? {
837            row_count += 1;
838            // Uses ArrayBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner
839            let builders_ref =
840                builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
841            for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
842                builder.append(datum);
843            }
844            if row_count == chunk_size {
845                let columns: Vec<_> = builders
846                    .take()
847                    .unwrap()
848                    .into_iter()
849                    .map(|builder| builder.finish().into())
850                    .collect();
851                yield (columns, row_count);
852                assert!(builders.is_none());
853                row_count = 0;
854            }
855        }
856
857        if let Some(builders) = builders {
858            assert_gt!(row_count, 0);
859            // yield the last chunk if any
860            let columns: Vec<_> = builders
861                .into_iter()
862                .map(|builder| builder.finish().into())
863                .collect();
864            yield (columns, row_count);
865        }
866    }
867
868    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
869    /// Returns a stream of chunks of columns with the provided `chunk_size`
870    async fn chunk_iter_with_pk_bounds(
871        &self,
872        epoch: HummockReadEpoch,
873        pk_prefix: impl Row,
874        range_bounds: impl RangeBounds<OwnedRow>,
875        ordered: bool,
876        chunk_size: usize,
877        prefetch_options: PrefetchOptions,
878    ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
879        let iter = self
880            .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
881            .await?;
882
883        Ok(Self::convert_row_stream_to_array_vec_stream(
884            iter,
885            self.schema.clone(),
886            chunk_size,
887        ))
888    }
889
890    /// Construct a stream item `StorageResult<OwnedRow>` for batch executors.
891    /// Differs from the streaming one, this iterator will wait for the epoch before iteration
892    pub async fn batch_iter_with_pk_bounds(
893        &self,
894        epoch: HummockReadEpoch,
895        pk_prefix: impl Row,
896        range_bounds: impl RangeBounds<OwnedRow>,
897        ordered: bool,
898        prefetch_options: PrefetchOptions,
899    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
900        self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
901            .await
902    }
903
904    // The returned iterator will iterate data from a snapshot corresponding to the given `epoch`.
905    pub async fn batch_iter(
906        &self,
907        epoch: HummockReadEpoch,
908        ordered: bool,
909        prefetch_options: PrefetchOptions,
910    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
911        self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
912            .await
913    }
914
915    pub async fn batch_iter_vnode(
916        &self,
917        epoch: HummockReadEpoch,
918        start_pk: Option<&OwnedRow>,
919        vnode: VirtualNode,
920        prefetch_options: PrefetchOptions,
921    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
922    {
923        let start_bound = if let Some(start_pk) = start_pk {
924            let mut bytes = BytesMut::new();
925            self.pk_serializer.serialize(start_pk, &mut bytes);
926            let bytes = bytes.freeze();
927            Included(bytes)
928        } else {
929            Unbounded
930        };
931        let read_snapshot = self
932            .store
933            .new_read_snapshot(
934                epoch,
935                NewReadSnapshotOptions {
936                    table_id: self.table_id,
937                },
938            )
939            .await?;
940        Ok(self
941            .iter_vnode_with_encoded_key_range::<()>(
942                &read_snapshot,
943                None,
944                (start_bound.as_ref(), Unbounded),
945                epoch,
946                vnode,
947                prefetch_options,
948            )
949            .await?
950            .map_ok(|(_, row)| row))
951    }
952
953    pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
954        self.store
955            .next_epoch(
956                epoch,
957                NextEpochOptions {
958                    table_id: self.table_id,
959                },
960            )
961            .await
962    }
963
964    async fn batch_iter_log_inner<K: CopyFromSlice>(
965        &self,
966        start_epoch: u64,
967        end_epoch: HummockReadEpoch,
968        start_pk: Option<&OwnedRow>,
969        vnode: VirtualNode,
970    ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + use<K, S, SD>> {
971        let start_bound = if let Some(start_pk) = start_pk {
972            let mut bytes = BytesMut::new();
973            self.pk_serializer.serialize(start_pk, &mut bytes);
974            let bytes = bytes.freeze();
975            Included(bytes)
976        } else {
977            Unbounded
978        };
979        let table_key_range =
980            prefixed_range_with_vnode::<&Bytes>((start_bound.as_ref(), Unbounded), vnode);
981        let read_options = ReadLogOptions {
982            table_id: self.table_id,
983        };
984        let iter = BatchTableInnerIterLogInner::<S, SD>::new(
985            &self.store,
986            self.mapping.clone(),
987            self.row_serde.clone(),
988            table_key_range,
989            read_options,
990            start_epoch,
991            end_epoch,
992        )
993        .await?
994        .into_stream::<K>();
995
996        Ok(iter)
997    }
998
999    pub async fn batch_iter_vnode_log(
1000        &self,
1001        start_epoch: u64,
1002        end_epoch: HummockReadEpoch,
1003        start_pk: Option<&OwnedRow>,
1004        vnode: VirtualNode,
1005    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + use<S, SD>> {
1006        let stream = self
1007            .batch_iter_log_inner::<()>(start_epoch, end_epoch, start_pk, vnode)
1008            .await?;
1009        Ok(stream.map_ok(|(_, row)| row))
1010    }
1011
1012    pub async fn batch_iter_log_with_pk_bounds(
1013        &self,
1014        start_epoch: u64,
1015        end_epoch: HummockReadEpoch,
1016        ordered: bool,
1017    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
1018    {
1019        let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1020        build_vnode_stream(
1021            |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
1022            |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
1023            &vnodes,
1024            ordered,
1025        )
1026        .await
1027    }
1028
1029    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
1030    /// Returns a stream of `DataChunk` with the provided `chunk_size`
1031    pub async fn batch_chunk_iter_with_pk_bounds(
1032        &self,
1033        epoch: HummockReadEpoch,
1034        pk_prefix: impl Row,
1035        range_bounds: impl RangeBounds<OwnedRow>,
1036        ordered: bool,
1037        chunk_size: usize,
1038        prefetch_options: PrefetchOptions,
1039    ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1040        let iter = self
1041            .chunk_iter_with_pk_bounds(
1042                epoch,
1043                pk_prefix,
1044                range_bounds,
1045                ordered,
1046                chunk_size,
1047                prefetch_options,
1048            )
1049            .await?;
1050
1051        Ok(iter.map(|item| {
1052            let (columns, row_count) = item?;
1053            Ok(DataChunk::new(columns, row_count))
1054        }))
1055    }
1056}
1057
1058/// [`BatchTableInnerIterInner`] iterates on the storage table.
1059struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1060    /// An iterator that returns raw bytes from storage.
1061    iter: SI,
1062
1063    mapping: Arc<ColumnMapping>,
1064
1065    /// The index of system column `_rw_timestamp` in the output columns.
1066    epoch_idx: Option<usize>,
1067
1068    row_deserializer: Arc<SD>,
1069
1070    /// Used for serializing and deserializing the primary key.
1071    pk_serializer: Option<Arc<OrderedRowSerde>>,
1072
1073    output_indices: Vec<usize>,
1074
1075    /// the key part of `output_indices`.
1076    key_output_indices: Option<Vec<usize>>,
1077
1078    /// the value part of `output_indices`.
1079    value_output_indices: Vec<usize>,
1080
1081    /// used for deserializing key part of output row from pk.
1082    output_row_in_key_indices: Vec<usize>,
1083}
1084
1085impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1086    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1087    #[allow(clippy::too_many_arguments)]
1088    async fn new<S>(
1089        store: &S,
1090        mapping: Arc<ColumnMapping>,
1091        epoch_idx: Option<usize>,
1092        pk_serializer: Option<Arc<OrderedRowSerde>>,
1093        output_indices: Vec<usize>,
1094        key_output_indices: Option<Vec<usize>>,
1095        value_output_indices: Vec<usize>,
1096        output_row_in_key_indices: Vec<usize>,
1097        row_deserializer: Arc<SD>,
1098        table_key_range: TableKeyRange,
1099        read_options: ReadOptions,
1100    ) -> StorageResult<Self>
1101    where
1102        S: StateStoreRead<Iter = SI>,
1103    {
1104        let iter = store.iter(table_key_range, read_options).await?;
1105        let iter = Self {
1106            iter,
1107            mapping,
1108            epoch_idx,
1109            row_deserializer,
1110            pk_serializer,
1111            output_indices,
1112            key_output_indices,
1113            value_output_indices,
1114            output_row_in_key_indices,
1115        };
1116        Ok(iter)
1117    }
1118
1119    /// Yield a row with its primary key.
1120    #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1121    async fn into_stream<K: CopyFromSlice>(mut self) {
1122        while let Some((k, v)) = self
1123            .iter
1124            .try_next()
1125            .instrument_await("storage_table_iter_next".verbose())
1126            .await?
1127        {
1128            let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1129            let row = self.row_deserializer.deserialize(value)?;
1130            let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1131            let row = match &self.key_output_indices {
1132                Some(key_output_indices) => {
1133                    let result_row_in_key = match self.pk_serializer.clone() {
1134                        Some(pk_serializer) => {
1135                            let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1136
1137                            pk.project(&self.output_row_in_key_indices).into_owned_row()
1138                        }
1139                        None => OwnedRow::empty(),
1140                    };
1141
1142                    let mut result_row_vec = vec![];
1143                    for idx in &self.output_indices {
1144                        if let Some(epoch_idx) = self.epoch_idx
1145                            && *idx == epoch_idx
1146                        {
1147                            let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1148                            result_row_vec
1149                                .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1150                        } else if self.value_output_indices.contains(idx) {
1151                            let item_position_in_value_indices = &self
1152                                .value_output_indices
1153                                .iter()
1154                                .position(|p| idx == p)
1155                                .unwrap();
1156                            result_row_vec.push(
1157                                result_row_in_value
1158                                    .datum_at(*item_position_in_value_indices)
1159                                    .to_owned_datum(),
1160                            );
1161                        } else {
1162                            let item_position_in_pk_indices =
1163                                key_output_indices.iter().position(|p| idx == p).unwrap();
1164                            result_row_vec.push(
1165                                result_row_in_key
1166                                    .datum_at(item_position_in_pk_indices)
1167                                    .to_owned_datum(),
1168                            );
1169                        }
1170                    }
1171                    OwnedRow::new(result_row_vec)
1172                }
1173                None => match &self.epoch_idx {
1174                    Some(epoch_idx) => {
1175                        let mut result_row_vec = vec![];
1176                        for idx in &self.output_indices {
1177                            if idx == epoch_idx {
1178                                let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1179                                result_row_vec
1180                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1181                            } else {
1182                                let item_position_in_value_indices = &self
1183                                    .value_output_indices
1184                                    .iter()
1185                                    .position(|p| idx == p)
1186                                    .unwrap();
1187                                result_row_vec.push(
1188                                    result_row_in_value
1189                                        .datum_at(*item_position_in_value_indices)
1190                                        .to_owned_datum(),
1191                                );
1192                            }
1193                        }
1194                        OwnedRow::new(result_row_vec)
1195                    }
1196                    None => result_row_in_value.into_owned_row(),
1197                },
1198            };
1199            yield (K::copy_from_slice(table_key.as_ref()), row);
1200        }
1201    }
1202}
1203
1204/// [`BatchTableInnerIterLogInner`] iterates on the storage table.
1205struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1206    /// An iterator that returns raw bytes from storage.
1207    iter: S::ChangeLogIter,
1208
1209    mapping: Arc<ColumnMapping>,
1210
1211    row_deserializer: Arc<SD>,
1212}
1213
1214impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1215    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1216    #[allow(clippy::too_many_arguments)]
1217    async fn new(
1218        store: &S,
1219        mapping: Arc<ColumnMapping>,
1220        row_deserializer: Arc<SD>,
1221        table_key_range: TableKeyRange,
1222        read_options: ReadLogOptions,
1223        start_epoch: u64,
1224        end_epoch: HummockReadEpoch,
1225    ) -> StorageResult<Self> {
1226        store
1227            .try_wait_epoch(
1228                end_epoch,
1229                TryWaitEpochOptions {
1230                    table_id: read_options.table_id,
1231                },
1232            )
1233            .await?;
1234        let iter = store
1235            .iter_log(
1236                (start_epoch, end_epoch.get_epoch()),
1237                table_key_range,
1238                read_options,
1239            )
1240            .await?;
1241        let iter = Self {
1242            iter,
1243            mapping,
1244            row_deserializer,
1245        };
1246        Ok(iter)
1247    }
1248
1249    /// Yield a row with its primary key.
1250    fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1251        self.iter.into_stream(move |(table_key, value)| {
1252            value
1253                .try_map(|value| {
1254                    let full_row = self.row_deserializer.deserialize(value)?;
1255                    let row = self
1256                        .mapping
1257                        .project(OwnedRow::new(full_row))
1258                        .into_owned_row();
1259                    Ok(row)
1260                })
1261                .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1262        })
1263    }
1264}