risingwave_storage/table/batch_table/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19use std::time::Duration;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use bytes::{Bytes, BytesMut};
23use foyer::Hint;
24use futures::future::try_join_all;
25use futures::{Stream, StreamExt, TryStreamExt};
26use futures_async_stream::try_stream;
27use itertools::Itertools;
28use more_asserts::assert_gt;
29use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
33use risingwave_common::row::{self, OwnedRow, Row, RowExt};
34use risingwave_common::types::{DataType, ToOwnedDatum};
35use risingwave_common::util::epoch::Epoch;
36use risingwave_common::util::row_serde::*;
37use risingwave_common::util::sort_util::OrderType;
38use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
39use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, deserialize_datum};
40use risingwave_hummock_sdk::HummockReadEpoch;
41use risingwave_hummock_sdk::key::{
42    CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
43};
44use risingwave_pb::batch_plan::{PbScanRange, scan_range};
45use risingwave_pb::plan_common::StorageTableDesc;
46use tracing::trace;
47mod vector_index_reader;
48pub use vector_index_reader::VectorIndexReader;
49
50use crate::StateStore;
51use crate::error::{StorageError, StorageResult};
52use crate::hummock::CachePolicy;
53use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
54use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
55use crate::row_serde::{ColumnMapping, find_columns_by_ids};
56use crate::store::timeout_auto_rebuild::iter_with_timeout_rebuild;
57use crate::store::{
58    NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
59    StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
60};
61use crate::table::merge_sort::NodePeek;
62use crate::table::{
63    ChangeLogRow, KeyedRow, TableDistribution, TableIter, should_calculate_prefix_hint,
64};
65
66pub type PkRangeBounds = (Bound<OwnedRow>, Bound<OwnedRow>);
67
68/// Primary-key scan range decoded from the batch plan scan-range protobuf.
69///
70/// This is shared by batch row scans and snapshot backfill so the protobuf decoding, full-scan
71/// default, and batch range-bound conversion stay in one place.
72#[derive(Clone, Debug)]
73pub struct PkScanRange {
74    /// Equality prefix of the primary key.
75    pub pk_prefix: OwnedRow,
76
77    /// Logical range bounds of the next primary-key column after `pk_prefix`.
78    pub range_bounds: PkRangeBounds,
79}
80
81impl PkScanRange {
82    fn pk_type_at(pk_types: &[DataType], index: usize) -> StorageResult<&DataType> {
83        pk_types.get(index).ok_or_else(|| {
84            StorageError::from(memcomparable::Error::Message(format!(
85                "invalid scan range: primary key index {} exceeds primary key length {}",
86                index,
87                pk_types.len()
88            )))
89        })
90    }
91
92    /// Decode a scan range from the protobuf representation and the primary-key column types.
93    pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> StorageResult<Self> {
94        let mut index = 0;
95        let pk_prefix = OwnedRow::new(
96            scan_range
97                .eq_conds
98                .iter()
99                .map(|v| -> StorageResult<_> {
100                    let ty = Self::pk_type_at(&pk_types, index)?;
101                    index += 1;
102                    Ok(deserialize_datum(v.as_slice(), ty)?)
103                })
104                .try_collect()?,
105        );
106        if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
107            return Ok(Self {
108                pk_prefix,
109                ..Self::full()
110            });
111        }
112
113        let build_bound =
114            |bound: &scan_range::Bound, mut index| -> StorageResult<Bound<OwnedRow>> {
115                let range_bound = OwnedRow::new(
116                    bound
117                        .value
118                        .iter()
119                        .map(|v| -> StorageResult<_> {
120                            let ty = Self::pk_type_at(&pk_types, index)?;
121                            index += 1;
122                            Ok(deserialize_datum(v.as_slice(), ty)?)
123                        })
124                        .try_collect()?,
125                );
126                if bound.inclusive {
127                    Ok(Bound::Included(range_bound))
128                } else {
129                    Ok(Bound::Excluded(range_bound))
130                }
131            };
132
133        let range_bounds = match (
134            scan_range.lower_bound.as_ref(),
135            scan_range.upper_bound.as_ref(),
136        ) {
137            (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
138            (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
139            (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
140            (None, None) => unreachable!(),
141        };
142        Ok(Self {
143            pk_prefix,
144            range_bounds,
145        })
146    }
147
148    /// Create a scan range for full table scan.
149    pub fn full() -> Self {
150        Self {
151            pk_prefix: OwnedRow::default(),
152            range_bounds: (Bound::Unbounded, Bound::Unbounded),
153        }
154    }
155
156    pub fn build_from_protobuf(
157        scan_ranges: &[PbScanRange],
158        table_desc: &StorageTableDesc,
159    ) -> StorageResult<Vec<Self>> {
160        if scan_ranges.is_empty() {
161            Ok(vec![Self::full()])
162        } else {
163            scan_ranges
164                .iter()
165                .map(|scan_range| Self::from_protobuf(scan_range, table_desc))
166                .try_collect()
167        }
168    }
169
170    pub fn from_protobuf(
171        scan_range: &PbScanRange,
172        table_desc: &StorageTableDesc,
173    ) -> StorageResult<Self> {
174        let pk_types = table_desc
175            .pk
176            .iter()
177            .map(|order| {
178                DataType::from(
179                    table_desc.columns[order.column_index as usize]
180                        .column_type
181                        .as_ref()
182                        .unwrap(),
183                )
184            })
185            .collect_vec();
186        Self::new(scan_range.clone(), pk_types)
187    }
188
189    pub fn convert_to_range_bounds<S: StateStore, SD: ValueRowSerde>(
190        self,
191        table: &BatchTableInner<S, SD>,
192    ) -> PkRangeBounds {
193        let PkScanRange {
194            pk_prefix,
195            range_bounds,
196        } = self;
197
198        // The len of a valid pk_prefix should be less than or equal pk's num.
199        let Some(order_type) = table.pk_serializer().get_order_types().get(pk_prefix.len()) else {
200            return range_bounds;
201        };
202        let (start_bound, end_bound) = if order_type.is_ascending() {
203            (range_bounds.0, range_bounds.1)
204        } else {
205            (range_bounds.1, range_bounds.0)
206        };
207
208        let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
209        let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
210
211        let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| match bound {
212            Bound::Unbounded => {
213                if other_bound_is_bounded && order_type_nulls {
214                    // `NULL`s are at the start bound side, we should exclude them to meet SQL
215                    // semantics.
216                    Bound::Excluded(OwnedRow::new(vec![None]))
217                } else {
218                    // Both start and end are unbounded, so we need to select all rows.
219                    Bound::Unbounded
220                }
221            }
222            Bound::Included(x) => Bound::Included(x),
223            Bound::Excluded(x) => Bound::Excluded(x),
224        };
225        let start_bound = build_bound(
226            end_bound_is_bounded,
227            start_bound,
228            order_type.nulls_are_first(),
229        );
230        let end_bound = build_bound(
231            start_bound_is_bounded,
232            end_bound,
233            order_type.nulls_are_last(),
234        );
235        (start_bound, end_bound)
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242
243    #[test]
244    fn test_pk_scan_range_rejects_out_of_bounds_pk_index() {
245        let scan_range = PbScanRange {
246            eq_conds: vec![vec![]],
247            ..Default::default()
248        };
249        assert!(PkScanRange::new(scan_range, vec![]).is_err());
250
251        let scan_range = PbScanRange {
252            lower_bound: Some(scan_range::Bound {
253                value: vec![vec![]],
254                inclusive: false,
255            }),
256            ..Default::default()
257        };
258        assert!(PkScanRange::new(scan_range, vec![]).is_err());
259    }
260}
261
262/// [`BatchTableInner`] is the interface accessing relational data in KV(`StateStore`) with
263/// row-based encoding format, and is used in batch mode.
264#[derive(Clone)]
265pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
266    /// Id for this table.
267    table_id: TableId,
268
269    /// State store backend.
270    store: S,
271
272    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
273    /// `RowSeqScanExecutor`.
274    schema: Schema,
275
276    /// Used for serializing and deserializing the primary key.
277    pk_serializer: OrderedRowSerde,
278
279    output_indices: Vec<usize>,
280
281    /// the key part of `output_indices`.
282    key_output_indices: Option<Vec<usize>>,
283
284    /// the value part of `output_indices`.
285    value_output_indices: Vec<usize>,
286
287    /// used for deserializing key part of output row from pk.
288    output_row_in_key_indices: Vec<usize>,
289
290    /// Mapping from column id to column index for deserializing the row.
291    mapping: Arc<ColumnMapping>,
292
293    /// The index of system column `_rw_timestamp` in the output columns.
294    epoch_idx: Option<usize>,
295
296    /// Row deserializer to deserialize the value in storage to a row.
297    /// The row can be either complete or partial, depending on whether the row encoding is versioned.
298    row_serde: Arc<SD>,
299
300    /// Indices of primary key.
301    /// Note that the index is based on the all columns of the table, instead of the output ones.
302    // FIXME: revisit constructions and usages.
303    pk_indices: Vec<usize>,
304
305    distribution: TableDistribution,
306
307    /// Used for catalog `table_properties`
308    table_option: TableOption,
309
310    read_prefix_len_hint: usize,
311}
312
313/// `BatchTable` will use [`EitherSerde`] as default so that we can support both versioned and
314/// non-versioned tables with the same type.
315pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
316
317impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
318    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319        f.debug_struct("BatchTableInner").finish_non_exhaustive()
320    }
321}
322
323// init
324impl<S: StateStore> BatchTableInner<S, EitherSerde> {
325    /// Create a  [`BatchTableInner`] given a complete set of `columns` and a partial
326    /// set of `output_column_ids`.
327    /// When reading from the storage table,
328    /// the chunks or rows will only contain columns with the given ids (`output_column_ids`).
329    /// They will in the same order as the given `output_column_ids`.
330    ///
331    /// NOTE(kwannoel): The `output_column_ids` here may be slightly different
332    /// from those supplied to associated executors.
333    /// These `output_column_ids` may have `pk` appended, since they will be needed to scan from
334    /// storage. The associated executors may not have these `pk` fields.
335    pub fn new_partial(
336        store: S,
337        output_column_ids: Vec<ColumnId>,
338        vnodes: Option<Arc<Bitmap>>,
339        table_desc: &StorageTableDesc,
340    ) -> Self {
341        let table_id = table_desc.table_id;
342        let column_descs = table_desc
343            .columns
344            .iter()
345            .map(ColumnDesc::from)
346            .collect_vec();
347        let order_types: Vec<OrderType> = table_desc
348            .pk
349            .iter()
350            .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
351            .collect();
352
353        let pk_indices = table_desc
354            .pk
355            .iter()
356            .map(|k| k.column_index as usize)
357            .collect_vec();
358
359        let table_option = TableOption {
360            retention_seconds: table_desc.retention_seconds,
361        };
362        let value_indices = table_desc
363            .get_value_indices()
364            .iter()
365            .map(|&k| k as usize)
366            .collect_vec();
367        let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
368        let versioned = table_desc.versioned;
369        let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
370
371        Self::new_inner(
372            store,
373            table_id,
374            column_descs,
375            output_column_ids,
376            order_types,
377            pk_indices,
378            distribution,
379            table_option,
380            value_indices,
381            prefix_hint_len,
382            versioned,
383        )
384    }
385
386    pub fn for_test_with_partial_columns(
387        store: S,
388        table_id: TableId,
389        columns: Vec<ColumnDesc>,
390        output_column_ids: Vec<ColumnId>,
391        order_types: Vec<OrderType>,
392        pk_indices: Vec<usize>,
393        value_indices: Vec<usize>,
394    ) -> Self {
395        Self::new_inner(
396            store,
397            table_id,
398            columns,
399            output_column_ids,
400            order_types,
401            pk_indices,
402            TableDistribution::singleton(),
403            Default::default(),
404            value_indices,
405            0,
406            false,
407        )
408    }
409
410    pub fn for_test(
411        store: S,
412        table_id: TableId,
413        columns: Vec<ColumnDesc>,
414        order_types: Vec<OrderType>,
415        pk_indices: Vec<usize>,
416        value_indices: Vec<usize>,
417    ) -> Self {
418        let output_column_ids = columns.iter().map(|c| c.column_id).collect();
419        Self::for_test_with_partial_columns(
420            store,
421            table_id,
422            columns,
423            output_column_ids,
424            order_types,
425            pk_indices,
426            value_indices,
427        )
428    }
429
430    #[allow(clippy::too_many_arguments)]
431    fn new_inner(
432        store: S,
433        table_id: TableId,
434        table_columns: Vec<ColumnDesc>,
435        output_column_ids: Vec<ColumnId>,
436        order_types: Vec<OrderType>,
437        pk_indices: Vec<usize>,
438        distribution: TableDistribution,
439        table_option: TableOption,
440        value_indices: Vec<usize>,
441        read_prefix_len_hint: usize,
442        versioned: bool,
443    ) -> Self {
444        assert_eq!(order_types.len(), pk_indices.len());
445
446        let (output_columns, output_indices) =
447            find_columns_by_ids(&table_columns, &output_column_ids);
448
449        let mut value_output_indices = vec![];
450        let mut key_output_indices = vec![];
451        // system column currently only contains `_rw_timestamp`
452        let mut epoch_idx = None;
453
454        for idx in &output_indices {
455            if value_indices.contains(idx) {
456                value_output_indices.push(*idx);
457            } else if pk_indices.contains(idx) {
458                key_output_indices.push(*idx);
459            } else {
460                assert!(epoch_idx.is_none());
461                epoch_idx = Some(*idx);
462            }
463        }
464
465        let output_row_in_key_indices = key_output_indices
466            .iter()
467            .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
468            .collect_vec();
469        let schema = Schema::new(output_columns.iter().map(Into::into).collect());
470
471        let pk_data_types = pk_indices
472            .iter()
473            .map(|i| table_columns[*i].data_type.clone())
474            .collect();
475        let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
476        let (row_serde, mapping) = {
477            if versioned {
478                let value_output_indices_dedup = value_output_indices
479                    .iter()
480                    .unique()
481                    .copied()
482                    .collect::<Vec<_>>();
483                let output_row_in_value_output_indices_dedup = value_output_indices
484                    .iter()
485                    .map(|&di| {
486                        value_output_indices_dedup
487                            .iter()
488                            .position(|&pi| di == pi)
489                            .unwrap()
490                    })
491                    .collect_vec();
492                let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
493                let serde =
494                    ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
495                (serde.into(), mapping)
496            } else {
497                let output_row_in_value_indices = value_output_indices
498                    .iter()
499                    .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
500                    .collect_vec();
501                let mapping = ColumnMapping::new(output_row_in_value_indices);
502                let serde = BasicSerde::new(value_indices.into(), table_columns.into());
503                (serde.into(), mapping)
504            }
505        };
506
507        let key_output_indices = match key_output_indices.is_empty() {
508            true => None,
509            false => Some(key_output_indices),
510        };
511        Self {
512            table_id,
513            store,
514            schema,
515            pk_serializer,
516            output_indices,
517            key_output_indices,
518            value_output_indices,
519            output_row_in_key_indices,
520            mapping: Arc::new(mapping),
521            epoch_idx,
522            row_serde: Arc::new(row_serde),
523            pk_indices,
524            distribution,
525            table_option,
526            read_prefix_len_hint,
527        }
528    }
529}
530
531impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
532    pub fn pk_serializer(&self) -> &OrderedRowSerde {
533        &self.pk_serializer
534    }
535
536    pub fn schema(&self) -> &Schema {
537        &self.schema
538    }
539
540    pub fn pk_indices(&self) -> &[usize] {
541        &self.pk_indices
542    }
543
544    pub fn output_indices(&self) -> &[usize] {
545        &self.output_indices
546    }
547
548    /// Get the indices of the primary key columns in the output columns.
549    ///
550    /// Returns `None` if any of the primary key columns is not in the output columns.
551    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
552        self.pk_indices
553            .iter()
554            .map(|&i| self.output_indices.iter().position(|&j| i == j))
555            .collect()
556    }
557
558    pub fn table_id(&self) -> TableId {
559        self.table_id
560    }
561
562    pub fn vnodes(&self) -> &Arc<Bitmap> {
563        self.distribution.vnodes()
564    }
565}
566/// Point get
567impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
568    /// Get a single row by point get
569    pub async fn get_row(
570        &self,
571        pk: impl Row,
572        wait_epoch: HummockReadEpoch,
573    ) -> StorageResult<Option<OwnedRow>> {
574        self.store
575            .try_wait_epoch(
576                wait_epoch,
577                TryWaitEpochOptions {
578                    table_id: self.table_id,
579                },
580            )
581            .await?;
582        let serialized_pk = serialize_pk_with_vnode(
583            &pk,
584            &self.pk_serializer,
585            self.distribution.compute_vnode_by_pk(&pk),
586        );
587        assert!(pk.len() <= self.pk_indices.len());
588
589        let prefix_hint =
590            if should_calculate_prefix_hint(self.read_prefix_len_hint, pk.len(), false) {
591                Some(serialized_pk.slice(VirtualNode::SIZE..))
592            } else {
593                None
594            };
595
596        let read_options = ReadOptions {
597            prefix_hint,
598            cache_policy: CachePolicy::Fill(Hint::Normal),
599            ..Default::default()
600        };
601        let read_snapshot = self
602            .store
603            .new_read_snapshot(
604                wait_epoch,
605                NewReadSnapshotOptions {
606                    table_id: self.table_id,
607                    table_option: self.table_option,
608                },
609            )
610            .await?;
611        match read_snapshot
612            .on_key_value(serialized_pk, read_options, move |key, value| {
613                let row = self.row_serde.deserialize(value)?;
614                Ok((key.epoch_with_gap.pure_epoch(), row))
615            })
616            .await?
617        {
618            Some((epoch, row)) => {
619                let result_row_in_value = self.mapping.project(OwnedRow::new(row));
620
621                match &self.key_output_indices {
622                    Some(key_output_indices) => {
623                        let result_row_in_key =
624                            pk.project(&self.output_row_in_key_indices).into_owned_row();
625                        let mut result_row_vec = vec![];
626                        for idx in &self.output_indices {
627                            if let Some(epoch_idx) = self.epoch_idx
628                                && *idx == epoch_idx
629                            {
630                                let epoch = Epoch::from(epoch);
631                                result_row_vec
632                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
633                            } else if self.value_output_indices.contains(idx) {
634                                let item_position_in_value_indices = &self
635                                    .value_output_indices
636                                    .iter()
637                                    .position(|p| idx == p)
638                                    .unwrap();
639                                result_row_vec.push(
640                                    result_row_in_value
641                                        .datum_at(*item_position_in_value_indices)
642                                        .to_owned_datum(),
643                                );
644                            } else {
645                                let item_position_in_pk_indices =
646                                    key_output_indices.iter().position(|p| idx == p).unwrap();
647                                result_row_vec.push(
648                                    result_row_in_key
649                                        .datum_at(item_position_in_pk_indices)
650                                        .to_owned_datum(),
651                                );
652                            }
653                        }
654                        let result_row = OwnedRow::new(result_row_vec);
655                        Ok(Some(result_row))
656                    }
657                    None => match &self.epoch_idx {
658                        Some(epoch_idx) => {
659                            let mut result_row_vec = vec![];
660                            for idx in &self.output_indices {
661                                if idx == epoch_idx {
662                                    let epoch = Epoch::from(epoch);
663                                    result_row_vec.push(risingwave_common::types::Datum::from(
664                                        epoch.as_scalar(),
665                                    ));
666                                } else {
667                                    let item_position_in_value_indices = &self
668                                        .value_output_indices
669                                        .iter()
670                                        .position(|p| idx == p)
671                                        .unwrap();
672                                    result_row_vec.push(
673                                        result_row_in_value
674                                            .datum_at(*item_position_in_value_indices)
675                                            .to_owned_datum(),
676                                    );
677                                }
678                            }
679                            let result_row = OwnedRow::new(result_row_vec);
680                            Ok(Some(result_row))
681                        }
682                        None => Ok(Some(result_row_in_value.into_owned_row())),
683                    },
684                }
685            }
686            _ => Ok(None),
687        }
688    }
689
690    /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
691    #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
692    pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
693        self.distribution.update_vnode_bitmap(new_vnodes)
694    }
695}
696
697/// The row iterator of the storage table.
698/// The wrapper of stream item `StorageResult<OwnedRow>` if pk is not persisted.
699impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
700    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
701        self.next().await.transpose()
702    }
703}
704
705mod merge_vnode_stream {
706
707    use bytes::Bytes;
708    use futures::{Stream, StreamExt, TryStreamExt};
709    use risingwave_hummock_sdk::key::TableKey;
710
711    use crate::error::StorageResult;
712    use crate::table::KeyedRow;
713    use crate::table::merge_sort::{NodePeek, merge_sort};
714
715    pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
716        Single(RowSt),
717        Unordered(Vec<RowSt>),
718        Ordered(Vec<KeyedRowSt>),
719    }
720
721    pub(super) type MergedVnodeStream<
722        R: Send,
723        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
724        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
725    >
726    where
727        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
728    = impl Stream<Item = StorageResult<R>> + Send;
729
730    pub(super) type SortKeyType = Bytes; // TODO: may use Vec
731
732    #[define_opaque(MergedVnodeStream)]
733    pub(super) fn merge_stream<
734        R: Send,
735        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
736        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
737    >(
738        stream: VnodeStreamType<RowSt, KeyedRowSt>,
739    ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
740    where
741        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
742    {
743        #[auto_enums::auto_enum(futures03::Stream)]
744        match stream {
745            VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
746            VnodeStreamType::Unordered(streams) => futures::stream::iter(
747                streams
748                    .into_iter()
749                    .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
750            )
751            .flatten_unordered(1024),
752            VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
753                Box::pin(stream.map_ok(|(key, row)| KeyedRow {
754                    vnode_prefixed_key: TableKey(key),
755                    row,
756                }))
757            }))
758            .map_ok(|keyed_row| keyed_row.row),
759        }
760    }
761}
762
763use merge_vnode_stream::*;
764
765async fn build_vnode_stream<
766    R: Send,
767    RowSt: Stream<Item = StorageResult<((), R)>> + Send,
768    KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
769    RowStFut: Future<Output = StorageResult<RowSt>>,
770    KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
771>(
772    row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
773    keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
774    vnodes: &[VirtualNode],
775    ordered: bool,
776) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
777where
778    KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
779{
780    let stream = match vnodes {
781        [] => unreachable!(),
782        [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
783        // Concat all iterators if not to preserve order.
784        vnodes if !ordered => VnodeStreamType::Unordered(
785            try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
786        ),
787        // Merge all iterators if to preserve order.
788        vnodes => VnodeStreamType::Ordered(
789            try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
790        ),
791    };
792    Ok(merge_stream(stream))
793}
794
795/// Iterators
796impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
797    /// Get multiple stream item `StorageResult<OwnedRow>` based on the specified vnodes of this table with
798    /// `vnode_hint`, and merge or concat them by given `ordered`.
799    async fn iter_with_encoded_key_range(
800        &self,
801        prefix_hint: Option<Bytes>,
802        (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
803        wait_epoch: HummockReadEpoch,
804        vnode_hint: Option<VirtualNode>,
805        ordered: bool,
806        prefetch_options: PrefetchOptions,
807    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
808    {
809        let vnodes = match vnode_hint {
810            // If `vnode_hint` is set, we can only access this single vnode.
811            Some(vnode) => {
812                assert!(
813                    self.distribution.vnodes().is_set(vnode.to_index()),
814                    "vnode unset: {:?}, distribution: {:?}",
815                    vnode,
816                    self.distribution
817                );
818                vec![vnode]
819            }
820            // Otherwise, we need to access all vnodes of this table.
821            None => self.distribution.vnodes().iter_vnodes().collect_vec(),
822        };
823
824        let read_snapshot = self
825            .store
826            .new_read_snapshot(
827                wait_epoch,
828                NewReadSnapshotOptions {
829                    table_id: self.table_id,
830                    table_option: self.table_option,
831                },
832            )
833            .await?;
834
835        build_vnode_stream(
836            |vnode| {
837                self.iter_vnode_with_encoded_key_range(
838                    &read_snapshot,
839                    prefix_hint.clone(),
840                    (start_bound.as_ref(), end_bound.as_ref()),
841                    vnode,
842                    prefetch_options,
843                )
844            },
845            |vnode| {
846                self.iter_vnode_with_encoded_key_range(
847                    &read_snapshot,
848                    prefix_hint.clone(),
849                    (start_bound.as_ref(), end_bound.as_ref()),
850                    vnode,
851                    prefetch_options,
852                )
853            },
854            &vnodes,
855            ordered,
856        )
857        .await
858    }
859
860    async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
861        &self,
862        read_snapshot: &S::ReadSnapshot,
863        prefix_hint: Option<Bytes>,
864        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
865        vnode: VirtualNode,
866        prefetch_options: PrefetchOptions,
867    ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
868    {
869        let (table_key_range, read_options, pk_serializer) =
870            self.vnode_read_context(prefix_hint, encoded_key_range, vnode, prefetch_options);
871
872        let iter = read_snapshot.iter(table_key_range, read_options).await?;
873        Ok(self.iter_stream_from_state_store_iter::<K, _>(iter, pk_serializer))
874    }
875
876    fn vnode_read_context(
877        &self,
878        prefix_hint: Option<Bytes>,
879        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
880        vnode: VirtualNode,
881        prefetch_options: PrefetchOptions,
882    ) -> (TableKeyRange, ReadOptions, Option<Arc<OrderedRowSerde>>) {
883        let cache_policy = match &encoded_key_range {
884            // To prevent unbounded range scan queries from polluting the block cache, use the
885            // low priority fill policy.
886            (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
887            _ => CachePolicy::Fill(Hint::Normal),
888        };
889
890        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
891        {
892            {
893                let read_options = ReadOptions {
894                    prefix_hint,
895                    prefetch_options,
896                    cache_policy,
897                };
898                let pk_serializer = match self.output_row_in_key_indices.is_empty() {
899                    true => None,
900                    false => Some(Arc::new(self.pk_serializer.clone())),
901                };
902
903                (table_key_range, read_options, pk_serializer)
904            }
905        }
906    }
907
908    fn iter_stream_from_state_store_iter<K: CopyFromSlice, SI: StateStoreIter + Send>(
909        &self,
910        iter: SI,
911        pk_serializer: Option<Arc<OrderedRowSerde>>,
912    ) -> impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, SI, S, SD> {
913        BatchTableInnerIterInner {
914            iter,
915            mapping: self.mapping.clone(),
916            epoch_idx: self.epoch_idx,
917            row_deserializer: self.row_serde.clone(),
918            pk_serializer,
919            output_indices: self.output_indices.clone(),
920            key_output_indices: self.key_output_indices.clone(),
921            value_output_indices: self.value_output_indices.clone(),
922            output_row_in_key_indices: self.output_row_in_key_indices.clone(),
923        }
924        .into_stream::<K>()
925    }
926
927    // TODO: directly use `prefixed_range`.
928    fn serialize_pk_bound(
929        &self,
930        pk_prefix: impl Row,
931        range_bound: Bound<&OwnedRow>,
932        is_start_bound: bool,
933    ) -> Bound<Bytes> {
934        match range_bound {
935            Included(k) => {
936                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
937                let key = pk_prefix.chain(k);
938                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
939                if is_start_bound {
940                    Included(serialized_key)
941                } else {
942                    // Should use excluded next key for end bound.
943                    // Otherwise keys starting with the bound is not included.
944                    end_bound_of_prefix(&serialized_key)
945                }
946            }
947            Excluded(k) => {
948                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
949                let key = pk_prefix.chain(k);
950                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
951                if is_start_bound {
952                    // Storage doesn't support excluded begin key yet, so transform it to
953                    // included.
954                    // We always serialize a u8 for null of datum which is not equal to '\xff',
955                    // so we can assert that the next_key would never be empty.
956                    let next_serialized_key = next_key(&serialized_key);
957                    assert!(!next_serialized_key.is_empty());
958                    Included(Bytes::from(next_serialized_key))
959                } else {
960                    Excluded(serialized_key)
961                }
962            }
963            Unbounded => {
964                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
965                let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
966                if pk_prefix.is_empty() {
967                    Unbounded
968                } else if is_start_bound {
969                    Included(serialized_pk_prefix)
970                } else {
971                    end_bound_of_prefix(&serialized_pk_prefix)
972                }
973            }
974        }
975    }
976
977    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
978    async fn iter_with_pk_bounds(
979        &self,
980        epoch: HummockReadEpoch,
981        pk_prefix: impl Row,
982        range_bounds: impl RangeBounds<OwnedRow>,
983        ordered: bool,
984        prefetch_options: PrefetchOptions,
985    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
986        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
987        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
988        assert!(pk_prefix.len() <= self.pk_indices.len());
989        let pk_prefix_indices = (0..pk_prefix.len())
990            .map(|index| self.pk_indices[index])
991            .collect_vec();
992
993        let prefix_hint = if should_calculate_prefix_hint(
994            self.read_prefix_len_hint,
995            pk_prefix.len(),
996            true,
997        ) {
998            let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
999                start_key
1000            } else {
1001                unreachable!()
1002            };
1003            let prefix_len = self
1004                .pk_serializer
1005                .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
1006            Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
1007        } else {
1008            trace!(
1009                "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?}  pk_prefix_indices {:?}",
1010                self.table_id, pk_prefix, pk_prefix_indices
1011            );
1012            None
1013        };
1014
1015        trace!(
1016            "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?}  pk_prefix_indices {:?}",
1017            self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
1018        );
1019
1020        self.iter_with_encoded_key_range(
1021            prefix_hint,
1022            (start_key, end_key),
1023            epoch,
1024            self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
1025            ordered,
1026            prefetch_options,
1027        )
1028        .await
1029    }
1030
1031    // Construct a stream of (columns, row_count) from a row stream
1032    #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
1033    async fn convert_row_stream_to_array_vec_stream(
1034        iter: impl Stream<Item = StorageResult<OwnedRow>>,
1035        schema: Schema,
1036        chunk_size: usize,
1037    ) {
1038        use futures::{TryStreamExt, pin_mut};
1039        use risingwave_common::util::iter_util::ZipEqFast;
1040
1041        pin_mut!(iter);
1042
1043        let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
1044        let mut row_count = 0;
1045
1046        while let Some(row) = iter.try_next().await? {
1047            row_count += 1;
1048            // Uses ArrayBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner
1049            let builders_ref =
1050                builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
1051            for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
1052                builder.append(datum);
1053            }
1054            if row_count == chunk_size {
1055                let columns: Vec<_> = builders
1056                    .take()
1057                    .unwrap()
1058                    .into_iter()
1059                    .map(|builder| builder.finish().into())
1060                    .collect();
1061                yield (columns, row_count);
1062                assert!(builders.is_none());
1063                row_count = 0;
1064            }
1065        }
1066
1067        if let Some(builders) = builders {
1068            assert_gt!(row_count, 0);
1069            // yield the last chunk if any
1070            let columns: Vec<_> = builders
1071                .into_iter()
1072                .map(|builder| builder.finish().into())
1073                .collect();
1074            yield (columns, row_count);
1075        }
1076    }
1077
1078    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
1079    /// Returns a stream of chunks of columns with the provided `chunk_size`
1080    async fn chunk_iter_with_pk_bounds(
1081        &self,
1082        epoch: HummockReadEpoch,
1083        pk_prefix: impl Row,
1084        range_bounds: impl RangeBounds<OwnedRow>,
1085        ordered: bool,
1086        chunk_size: usize,
1087        prefetch_options: PrefetchOptions,
1088    ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
1089        let iter = self
1090            .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
1091            .await?;
1092
1093        Ok(Self::convert_row_stream_to_array_vec_stream(
1094            iter,
1095            self.schema.clone(),
1096            chunk_size,
1097        ))
1098    }
1099
1100    /// Construct a stream item `StorageResult<OwnedRow>` for batch executors.
1101    /// Differs from the streaming one, this iterator will wait for the epoch before iteration
1102    pub async fn batch_iter_with_pk_bounds(
1103        &self,
1104        epoch: HummockReadEpoch,
1105        pk_prefix: impl Row,
1106        range_bounds: impl RangeBounds<OwnedRow>,
1107        ordered: bool,
1108        prefetch_options: PrefetchOptions,
1109    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
1110        self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
1111            .await
1112    }
1113
1114    // The returned iterator will iterate data from a snapshot corresponding to the given `epoch`.
1115    pub async fn batch_iter(
1116        &self,
1117        epoch: HummockReadEpoch,
1118        ordered: bool,
1119        prefetch_options: PrefetchOptions,
1120    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
1121        self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
1122            .await
1123    }
1124
1125    fn start_bound_from_pk(&self, start_pk: Option<&OwnedRow>) -> Bound<Bytes> {
1126        if let Some(start_pk) = start_pk {
1127            let mut bytes = BytesMut::new();
1128            self.pk_serializer.serialize(start_pk, &mut bytes);
1129            let bytes = bytes.freeze();
1130            Included(bytes)
1131        } else {
1132            Unbounded
1133        }
1134    }
1135
1136    pub async fn batch_iter_vnode(
1137        &self,
1138        epoch: HummockReadEpoch,
1139        start_pk: Option<&OwnedRow>,
1140        vnode: VirtualNode,
1141        prefetch_options: PrefetchOptions,
1142        rebuild_interval: Duration,
1143    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
1144    {
1145        let full_pk_prefix = OwnedRow::default();
1146        let full_range = (Bound::Unbounded, Bound::Unbounded);
1147        self.batch_iter_vnode_with_pk_range(
1148            epoch,
1149            start_pk,
1150            &full_pk_prefix,
1151            &full_range,
1152            vnode,
1153            prefetch_options,
1154            rebuild_interval,
1155        )
1156        .await
1157    }
1158
1159    /// Iterates data from the given vnode, optionally restricted by a primary-key
1160    /// equality prefix and range bounds on the next primary-key column.
1161    pub async fn batch_iter_vnode_with_pk_range(
1162        &self,
1163        epoch: HummockReadEpoch,
1164        start_pk: Option<&OwnedRow>,
1165        pk_prefix: &OwnedRow,
1166        range_bounds: &(Bound<OwnedRow>, Bound<OwnedRow>),
1167        vnode: VirtualNode,
1168        prefetch_options: PrefetchOptions,
1169        rebuild_interval: Duration,
1170    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
1171    {
1172        assert!(
1173            !rebuild_interval.is_zero(),
1174            "rebuild_interval should be positive"
1175        );
1176
1177        let normalized_range_bounds = PkScanRange {
1178            pk_prefix: pk_prefix.clone(),
1179            range_bounds: range_bounds.clone(),
1180        }
1181        .convert_to_range_bounds(self);
1182
1183        let start_key = if let Some(start_pk) = start_pk {
1184            self.start_bound_from_pk(Some(start_pk))
1185        } else {
1186            self.serialize_pk_bound(pk_prefix, normalized_range_bounds.start_bound(), true)
1187        };
1188        let end_key =
1189            self.serialize_pk_bound(pk_prefix, normalized_range_bounds.end_bound(), false);
1190
1191        let prefix_hint =
1192            if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint <= pk_prefix.len() {
1193                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
1194                let serialized_pk_prefix = serialize_pk(pk_prefix, &pk_prefix_serializer);
1195                let prefix_len = self
1196                    .pk_serializer
1197                    .deserialize_prefix_len(&serialized_pk_prefix, self.read_prefix_len_hint)?;
1198                Some(Bytes::from(serialized_pk_prefix[..prefix_len].to_vec()))
1199            } else {
1200                None
1201            };
1202
1203        let snapshot = Arc::new(
1204            self.store
1205                .new_read_snapshot(
1206                    epoch,
1207                    NewReadSnapshotOptions {
1208                        table_id: self.table_id,
1209                        table_option: self.table_option,
1210                    },
1211                )
1212                .await?,
1213        );
1214        let (table_key_range, read_options, pk_serializer) = self.vnode_read_context(
1215            prefix_hint,
1216            (start_key.as_ref(), end_key.as_ref()),
1217            vnode,
1218            prefetch_options,
1219        );
1220        let iter = iter_with_timeout_rebuild(
1221            snapshot,
1222            table_key_range,
1223            self.table_id,
1224            read_options,
1225            rebuild_interval,
1226        )
1227        .await?;
1228        let iter = self.iter_stream_from_state_store_iter::<(), _>(iter, pk_serializer);
1229        Ok(iter.map_ok(|(_, row)| row))
1230    }
1231
1232    pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
1233        self.store
1234            .next_epoch(
1235                epoch,
1236                NextEpochOptions {
1237                    table_id: self.table_id,
1238                },
1239            )
1240            .await
1241    }
1242
1243    pub async fn batch_iter_vnode_log(
1244        &self,
1245        start_epoch: u64,
1246        end_epoch: HummockReadEpoch,
1247        start_pk: Option<&OwnedRow>,
1248        vnode: VirtualNode,
1249    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
1250    {
1251        let start_bound = self.start_bound_from_pk(start_pk);
1252        let stream = self
1253            .batch_iter_log_inner::<()>(
1254                start_epoch,
1255                end_epoch,
1256                (start_bound.as_ref(), Unbounded),
1257                vnode,
1258            )
1259            .await?;
1260        Ok(stream.map_ok(|(_, row)| row))
1261    }
1262
1263    pub async fn batch_iter_log_with_pk_bounds(
1264        &self,
1265        start_epoch: u64,
1266        end_epoch: HummockReadEpoch,
1267        ordered: bool,
1268        range_bounds: impl RangeBounds<OwnedRow>,
1269        pk_prefix: impl Row,
1270    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
1271        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
1272        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
1273        let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1274        build_vnode_stream(
1275            |vnode| {
1276                self.batch_iter_log_inner(
1277                    start_epoch,
1278                    end_epoch,
1279                    (start_key.as_ref(), end_key.as_ref()),
1280                    vnode,
1281                )
1282            },
1283            |vnode| {
1284                self.batch_iter_log_inner(
1285                    start_epoch,
1286                    end_epoch,
1287                    (start_key.as_ref(), end_key.as_ref()),
1288                    vnode,
1289                )
1290            },
1291            &vnodes,
1292            ordered,
1293        )
1294        .await
1295    }
1296
1297    async fn batch_iter_log_inner<K: CopyFromSlice>(
1298        &self,
1299        start_epoch: u64,
1300        end_epoch: HummockReadEpoch,
1301        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1302        vnode: VirtualNode,
1303    ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1304    {
1305        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1306        let read_options = ReadLogOptions {
1307            table_id: self.table_id,
1308        };
1309        let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1310            &self.store,
1311            self.mapping.clone(),
1312            self.row_serde.clone(),
1313            table_key_range,
1314            read_options,
1315            start_epoch,
1316            end_epoch,
1317        )
1318        .await?
1319        .into_stream::<K>();
1320
1321        Ok(iter)
1322    }
1323
1324    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
1325    /// Returns a stream of `DataChunk` with the provided `chunk_size`
1326    pub async fn batch_chunk_iter_with_pk_bounds(
1327        &self,
1328        epoch: HummockReadEpoch,
1329        pk_prefix: impl Row,
1330        range_bounds: impl RangeBounds<OwnedRow>,
1331        ordered: bool,
1332        chunk_size: usize,
1333        prefetch_options: PrefetchOptions,
1334    ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1335        let iter = self
1336            .chunk_iter_with_pk_bounds(
1337                epoch,
1338                pk_prefix,
1339                range_bounds,
1340                ordered,
1341                chunk_size,
1342                prefetch_options,
1343            )
1344            .await?;
1345
1346        Ok(iter.map(|item| {
1347            let (columns, row_count) = item?;
1348            Ok(DataChunk::new(columns, row_count))
1349        }))
1350    }
1351}
1352
1353/// [`BatchTableInnerIterInner`] iterates on the storage table.
1354struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1355    /// An iterator that returns raw bytes from storage.
1356    iter: SI,
1357
1358    mapping: Arc<ColumnMapping>,
1359
1360    /// The index of system column `_rw_timestamp` in the output columns.
1361    epoch_idx: Option<usize>,
1362
1363    row_deserializer: Arc<SD>,
1364
1365    /// Used for serializing and deserializing the primary key.
1366    pk_serializer: Option<Arc<OrderedRowSerde>>,
1367
1368    output_indices: Vec<usize>,
1369
1370    /// the key part of `output_indices`.
1371    key_output_indices: Option<Vec<usize>>,
1372
1373    /// the value part of `output_indices`.
1374    value_output_indices: Vec<usize>,
1375
1376    /// used for deserializing key part of output row from pk.
1377    output_row_in_key_indices: Vec<usize>,
1378}
1379
1380impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1381    /// Yield a row with its primary key.
1382    #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1383    async fn into_stream<K: CopyFromSlice>(mut self) {
1384        while let Some((k, v)) = self
1385            .iter
1386            .try_next()
1387            .instrument_await("storage_table_iter_next".verbose())
1388            .await?
1389        {
1390            let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1391            let row = self.row_deserializer.deserialize(value)?;
1392            let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1393            let row = match &self.key_output_indices {
1394                Some(key_output_indices) => {
1395                    let result_row_in_key = match self.pk_serializer.clone() {
1396                        Some(pk_serializer) => {
1397                            let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1398
1399                            pk.project(&self.output_row_in_key_indices).into_owned_row()
1400                        }
1401                        None => OwnedRow::empty(),
1402                    };
1403
1404                    let mut result_row_vec = vec![];
1405                    for idx in &self.output_indices {
1406                        if let Some(epoch_idx) = self.epoch_idx
1407                            && *idx == epoch_idx
1408                        {
1409                            let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1410                            result_row_vec
1411                                .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1412                        } else if self.value_output_indices.contains(idx) {
1413                            let item_position_in_value_indices = &self
1414                                .value_output_indices
1415                                .iter()
1416                                .position(|p| idx == p)
1417                                .unwrap();
1418                            result_row_vec.push(
1419                                result_row_in_value
1420                                    .datum_at(*item_position_in_value_indices)
1421                                    .to_owned_datum(),
1422                            );
1423                        } else {
1424                            let item_position_in_pk_indices =
1425                                key_output_indices.iter().position(|p| idx == p).unwrap();
1426                            result_row_vec.push(
1427                                result_row_in_key
1428                                    .datum_at(item_position_in_pk_indices)
1429                                    .to_owned_datum(),
1430                            );
1431                        }
1432                    }
1433                    OwnedRow::new(result_row_vec)
1434                }
1435                None => match &self.epoch_idx {
1436                    Some(epoch_idx) => {
1437                        let mut result_row_vec = vec![];
1438                        for idx in &self.output_indices {
1439                            if idx == epoch_idx {
1440                                let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1441                                result_row_vec
1442                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1443                            } else {
1444                                let item_position_in_value_indices = &self
1445                                    .value_output_indices
1446                                    .iter()
1447                                    .position(|p| idx == p)
1448                                    .unwrap();
1449                                result_row_vec.push(
1450                                    result_row_in_value
1451                                        .datum_at(*item_position_in_value_indices)
1452                                        .to_owned_datum(),
1453                                );
1454                            }
1455                        }
1456                        OwnedRow::new(result_row_vec)
1457                    }
1458                    None => result_row_in_value.into_owned_row(),
1459                },
1460            };
1461            yield (K::copy_from_slice(table_key.as_ref()), row);
1462        }
1463    }
1464}
1465
1466/// [`BatchTableInnerIterLogInner`] iterates on the storage table.
1467struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1468    /// An iterator that returns raw bytes from storage.
1469    iter: S::ChangeLogIter,
1470
1471    mapping: Arc<ColumnMapping>,
1472
1473    row_deserializer: Arc<SD>,
1474}
1475
1476impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1477    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1478    #[allow(clippy::too_many_arguments)]
1479    async fn new(
1480        store: &S,
1481        mapping: Arc<ColumnMapping>,
1482        row_deserializer: Arc<SD>,
1483        table_key_range: TableKeyRange,
1484        read_options: ReadLogOptions,
1485        start_epoch: u64,
1486        end_epoch: HummockReadEpoch,
1487    ) -> StorageResult<Self> {
1488        store
1489            .try_wait_epoch(
1490                end_epoch,
1491                TryWaitEpochOptions {
1492                    table_id: read_options.table_id,
1493                },
1494            )
1495            .await?;
1496        let iter = store
1497            .iter_log(
1498                (start_epoch, end_epoch.get_epoch()),
1499                table_key_range,
1500                read_options,
1501            )
1502            .await?;
1503        let iter = Self {
1504            iter,
1505            mapping,
1506            row_deserializer,
1507        };
1508        Ok(iter)
1509    }
1510
1511    /// Yield a row with its primary key.
1512    fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1513        self.iter.into_stream(move |(table_key, value)| {
1514            value
1515                .try_map(|value| {
1516                    let full_row = self.row_deserializer.deserialize(value)?;
1517                    let row = self
1518                        .mapping
1519                        .project(OwnedRow::new(full_row))
1520                        .into_owned_row();
1521                    Ok(row)
1522                })
1523                .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1524        })
1525    }
1526}