risingwave_storage/table/batch_table/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19use std::time::Duration;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use bytes::{Bytes, BytesMut};
23use foyer::Hint;
24use futures::future::try_join_all;
25use futures::{Stream, StreamExt, TryStreamExt};
26use futures_async_stream::try_stream;
27use itertools::Itertools;
28use more_asserts::assert_gt;
29use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
33use risingwave_common::row::{self, OwnedRow, Row, RowExt};
34use risingwave_common::types::ToOwnedDatum;
35use risingwave_common::util::epoch::Epoch;
36use risingwave_common::util::row_serde::*;
37use risingwave_common::util::sort_util::OrderType;
38use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
39use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
40use risingwave_hummock_sdk::HummockReadEpoch;
41use risingwave_hummock_sdk::key::{
42    CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
43};
44use risingwave_pb::plan_common::StorageTableDesc;
45use tracing::trace;
46mod vector_index_reader;
47pub use vector_index_reader::VectorIndexReader;
48
49use crate::StateStore;
50use crate::error::{StorageError, StorageResult};
51use crate::hummock::CachePolicy;
52use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
53use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
54use crate::row_serde::{ColumnMapping, find_columns_by_ids};
55use crate::store::timeout_auto_rebuild::iter_with_timeout_rebuild;
56use crate::store::{
57    NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
58    StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
59};
60use crate::table::merge_sort::NodePeek;
61use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
62
63/// [`BatchTableInner`] is the interface accessing relational data in KV(`StateStore`) with
64/// row-based encoding format, and is used in batch mode.
65#[derive(Clone)]
66pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
67    /// Id for this table.
68    table_id: TableId,
69
70    /// State store backend.
71    store: S,
72
73    /// The schema of the output columns, i.e., this table VIEWED BY some executor like
74    /// `RowSeqScanExecutor`.
75    schema: Schema,
76
77    /// Used for serializing and deserializing the primary key.
78    pk_serializer: OrderedRowSerde,
79
80    output_indices: Vec<usize>,
81
82    /// the key part of `output_indices`.
83    key_output_indices: Option<Vec<usize>>,
84
85    /// the value part of `output_indices`.
86    value_output_indices: Vec<usize>,
87
88    /// used for deserializing key part of output row from pk.
89    output_row_in_key_indices: Vec<usize>,
90
91    /// Mapping from column id to column index for deserializing the row.
92    mapping: Arc<ColumnMapping>,
93
94    /// The index of system column `_rw_timestamp` in the output columns.
95    epoch_idx: Option<usize>,
96
97    /// Row deserializer to deserialize the value in storage to a row.
98    /// The row can be either complete or partial, depending on whether the row encoding is versioned.
99    row_serde: Arc<SD>,
100
101    /// Indices of primary key.
102    /// Note that the index is based on the all columns of the table, instead of the output ones.
103    // FIXME: revisit constructions and usages.
104    pk_indices: Vec<usize>,
105
106    distribution: TableDistribution,
107
108    /// Used for catalog `table_properties`
109    table_option: TableOption,
110
111    read_prefix_len_hint: usize,
112}
113
114/// `BatchTable` will use [`EitherSerde`] as default so that we can support both versioned and
115/// non-versioned tables with the same type.
116pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
117
118impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        f.debug_struct("BatchTableInner").finish_non_exhaustive()
121    }
122}
123
124// init
125impl<S: StateStore> BatchTableInner<S, EitherSerde> {
126    /// Create a  [`BatchTableInner`] given a complete set of `columns` and a partial
127    /// set of `output_column_ids`.
128    /// When reading from the storage table,
129    /// the chunks or rows will only contain columns with the given ids (`output_column_ids`).
130    /// They will in the same order as the given `output_column_ids`.
131    ///
132    /// NOTE(kwannoel): The `output_column_ids` here may be slightly different
133    /// from those supplied to associated executors.
134    /// These `output_column_ids` may have `pk` appended, since they will be needed to scan from
135    /// storage. The associated executors may not have these `pk` fields.
136    pub fn new_partial(
137        store: S,
138        output_column_ids: Vec<ColumnId>,
139        vnodes: Option<Arc<Bitmap>>,
140        table_desc: &StorageTableDesc,
141    ) -> Self {
142        let table_id = table_desc.table_id;
143        let column_descs = table_desc
144            .columns
145            .iter()
146            .map(ColumnDesc::from)
147            .collect_vec();
148        let order_types: Vec<OrderType> = table_desc
149            .pk
150            .iter()
151            .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
152            .collect();
153
154        let pk_indices = table_desc
155            .pk
156            .iter()
157            .map(|k| k.column_index as usize)
158            .collect_vec();
159
160        let table_option = TableOption {
161            retention_seconds: table_desc.retention_seconds,
162        };
163        let value_indices = table_desc
164            .get_value_indices()
165            .iter()
166            .map(|&k| k as usize)
167            .collect_vec();
168        let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
169        let versioned = table_desc.versioned;
170        let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
171
172        Self::new_inner(
173            store,
174            table_id,
175            column_descs,
176            output_column_ids,
177            order_types,
178            pk_indices,
179            distribution,
180            table_option,
181            value_indices,
182            prefix_hint_len,
183            versioned,
184        )
185    }
186
187    pub fn for_test_with_partial_columns(
188        store: S,
189        table_id: TableId,
190        columns: Vec<ColumnDesc>,
191        output_column_ids: Vec<ColumnId>,
192        order_types: Vec<OrderType>,
193        pk_indices: Vec<usize>,
194        value_indices: Vec<usize>,
195    ) -> Self {
196        Self::new_inner(
197            store,
198            table_id,
199            columns,
200            output_column_ids,
201            order_types,
202            pk_indices,
203            TableDistribution::singleton(),
204            Default::default(),
205            value_indices,
206            0,
207            false,
208        )
209    }
210
211    pub fn for_test(
212        store: S,
213        table_id: TableId,
214        columns: Vec<ColumnDesc>,
215        order_types: Vec<OrderType>,
216        pk_indices: Vec<usize>,
217        value_indices: Vec<usize>,
218    ) -> Self {
219        let output_column_ids = columns.iter().map(|c| c.column_id).collect();
220        Self::for_test_with_partial_columns(
221            store,
222            table_id,
223            columns,
224            output_column_ids,
225            order_types,
226            pk_indices,
227            value_indices,
228        )
229    }
230
231    #[allow(clippy::too_many_arguments)]
232    fn new_inner(
233        store: S,
234        table_id: TableId,
235        table_columns: Vec<ColumnDesc>,
236        output_column_ids: Vec<ColumnId>,
237        order_types: Vec<OrderType>,
238        pk_indices: Vec<usize>,
239        distribution: TableDistribution,
240        table_option: TableOption,
241        value_indices: Vec<usize>,
242        read_prefix_len_hint: usize,
243        versioned: bool,
244    ) -> Self {
245        assert_eq!(order_types.len(), pk_indices.len());
246
247        let (output_columns, output_indices) =
248            find_columns_by_ids(&table_columns, &output_column_ids);
249
250        let mut value_output_indices = vec![];
251        let mut key_output_indices = vec![];
252        // system column currently only contains `_rw_timestamp`
253        let mut epoch_idx = None;
254
255        for idx in &output_indices {
256            if value_indices.contains(idx) {
257                value_output_indices.push(*idx);
258            } else if pk_indices.contains(idx) {
259                key_output_indices.push(*idx);
260            } else {
261                assert!(epoch_idx.is_none());
262                epoch_idx = Some(*idx);
263            }
264        }
265
266        let output_row_in_key_indices = key_output_indices
267            .iter()
268            .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
269            .collect_vec();
270        let schema = Schema::new(output_columns.iter().map(Into::into).collect());
271
272        let pk_data_types = pk_indices
273            .iter()
274            .map(|i| table_columns[*i].data_type.clone())
275            .collect();
276        let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
277        let (row_serde, mapping) = {
278            if versioned {
279                let value_output_indices_dedup = value_output_indices
280                    .iter()
281                    .unique()
282                    .copied()
283                    .collect::<Vec<_>>();
284                let output_row_in_value_output_indices_dedup = value_output_indices
285                    .iter()
286                    .map(|&di| {
287                        value_output_indices_dedup
288                            .iter()
289                            .position(|&pi| di == pi)
290                            .unwrap()
291                    })
292                    .collect_vec();
293                let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
294                let serde =
295                    ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
296                (serde.into(), mapping)
297            } else {
298                let output_row_in_value_indices = value_output_indices
299                    .iter()
300                    .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
301                    .collect_vec();
302                let mapping = ColumnMapping::new(output_row_in_value_indices);
303                let serde = BasicSerde::new(value_indices.into(), table_columns.into());
304                (serde.into(), mapping)
305            }
306        };
307
308        let key_output_indices = match key_output_indices.is_empty() {
309            true => None,
310            false => Some(key_output_indices),
311        };
312        Self {
313            table_id,
314            store,
315            schema,
316            pk_serializer,
317            output_indices,
318            key_output_indices,
319            value_output_indices,
320            output_row_in_key_indices,
321            mapping: Arc::new(mapping),
322            epoch_idx,
323            row_serde: Arc::new(row_serde),
324            pk_indices,
325            distribution,
326            table_option,
327            read_prefix_len_hint,
328        }
329    }
330}
331
332impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
333    pub fn pk_serializer(&self) -> &OrderedRowSerde {
334        &self.pk_serializer
335    }
336
337    pub fn schema(&self) -> &Schema {
338        &self.schema
339    }
340
341    pub fn pk_indices(&self) -> &[usize] {
342        &self.pk_indices
343    }
344
345    pub fn output_indices(&self) -> &[usize] {
346        &self.output_indices
347    }
348
349    /// Get the indices of the primary key columns in the output columns.
350    ///
351    /// Returns `None` if any of the primary key columns is not in the output columns.
352    pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
353        self.pk_indices
354            .iter()
355            .map(|&i| self.output_indices.iter().position(|&j| i == j))
356            .collect()
357    }
358
359    pub fn table_id(&self) -> TableId {
360        self.table_id
361    }
362
363    pub fn vnodes(&self) -> &Arc<Bitmap> {
364        self.distribution.vnodes()
365    }
366}
367/// Point get
368impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
369    /// Get a single row by point get
370    pub async fn get_row(
371        &self,
372        pk: impl Row,
373        wait_epoch: HummockReadEpoch,
374    ) -> StorageResult<Option<OwnedRow>> {
375        self.store
376            .try_wait_epoch(
377                wait_epoch,
378                TryWaitEpochOptions {
379                    table_id: self.table_id,
380                },
381            )
382            .await?;
383        let serialized_pk = serialize_pk_with_vnode(
384            &pk,
385            &self.pk_serializer,
386            self.distribution.compute_vnode_by_pk(&pk),
387        );
388        assert!(pk.len() <= self.pk_indices.len());
389
390        let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
391        {
392            Some(serialized_pk.slice(VirtualNode::SIZE..))
393        } else {
394            None
395        };
396
397        let read_options = ReadOptions {
398            prefix_hint,
399            cache_policy: CachePolicy::Fill(Hint::Normal),
400            ..Default::default()
401        };
402        let read_snapshot = self
403            .store
404            .new_read_snapshot(
405                wait_epoch,
406                NewReadSnapshotOptions {
407                    table_id: self.table_id,
408                    table_option: self.table_option,
409                },
410            )
411            .await?;
412        match read_snapshot
413            .on_key_value(serialized_pk, read_options, move |key, value| {
414                let row = self.row_serde.deserialize(value)?;
415                Ok((key.epoch_with_gap.pure_epoch(), row))
416            })
417            .await?
418        {
419            Some((epoch, row)) => {
420                let result_row_in_value = self.mapping.project(OwnedRow::new(row));
421
422                match &self.key_output_indices {
423                    Some(key_output_indices) => {
424                        let result_row_in_key =
425                            pk.project(&self.output_row_in_key_indices).into_owned_row();
426                        let mut result_row_vec = vec![];
427                        for idx in &self.output_indices {
428                            if let Some(epoch_idx) = self.epoch_idx
429                                && *idx == epoch_idx
430                            {
431                                let epoch = Epoch::from(epoch);
432                                result_row_vec
433                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
434                            } else if self.value_output_indices.contains(idx) {
435                                let item_position_in_value_indices = &self
436                                    .value_output_indices
437                                    .iter()
438                                    .position(|p| idx == p)
439                                    .unwrap();
440                                result_row_vec.push(
441                                    result_row_in_value
442                                        .datum_at(*item_position_in_value_indices)
443                                        .to_owned_datum(),
444                                );
445                            } else {
446                                let item_position_in_pk_indices =
447                                    key_output_indices.iter().position(|p| idx == p).unwrap();
448                                result_row_vec.push(
449                                    result_row_in_key
450                                        .datum_at(item_position_in_pk_indices)
451                                        .to_owned_datum(),
452                                );
453                            }
454                        }
455                        let result_row = OwnedRow::new(result_row_vec);
456                        Ok(Some(result_row))
457                    }
458                    None => match &self.epoch_idx {
459                        Some(epoch_idx) => {
460                            let mut result_row_vec = vec![];
461                            for idx in &self.output_indices {
462                                if idx == epoch_idx {
463                                    let epoch = Epoch::from(epoch);
464                                    result_row_vec.push(risingwave_common::types::Datum::from(
465                                        epoch.as_scalar(),
466                                    ));
467                                } else {
468                                    let item_position_in_value_indices = &self
469                                        .value_output_indices
470                                        .iter()
471                                        .position(|p| idx == p)
472                                        .unwrap();
473                                    result_row_vec.push(
474                                        result_row_in_value
475                                            .datum_at(*item_position_in_value_indices)
476                                            .to_owned_datum(),
477                                    );
478                                }
479                            }
480                            let result_row = OwnedRow::new(result_row_vec);
481                            Ok(Some(result_row))
482                        }
483                        None => Ok(Some(result_row_in_value.into_owned_row())),
484                    },
485                }
486            }
487            _ => Ok(None),
488        }
489    }
490
491    /// Update the vnode bitmap of the storage table, returns the previous vnode bitmap.
492    #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
493    pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
494        self.distribution.update_vnode_bitmap(new_vnodes)
495    }
496}
497
498/// The row iterator of the storage table.
499/// The wrapper of stream item `StorageResult<OwnedRow>` if pk is not persisted.
500impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
501    async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
502        self.next().await.transpose()
503    }
504}
505
506mod merge_vnode_stream {
507
508    use bytes::Bytes;
509    use futures::{Stream, StreamExt, TryStreamExt};
510    use risingwave_hummock_sdk::key::TableKey;
511
512    use crate::error::StorageResult;
513    use crate::table::KeyedRow;
514    use crate::table::merge_sort::{NodePeek, merge_sort};
515
516    pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
517        Single(RowSt),
518        Unordered(Vec<RowSt>),
519        Ordered(Vec<KeyedRowSt>),
520    }
521
522    pub(super) type MergedVnodeStream<
523        R: Send,
524        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
525        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
526    >
527    where
528        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
529    = impl Stream<Item = StorageResult<R>> + Send;
530
531    pub(super) type SortKeyType = Bytes; // TODO: may use Vec
532
533    #[define_opaque(MergedVnodeStream)]
534    pub(super) fn merge_stream<
535        R: Send,
536        RowSt: Stream<Item = StorageResult<((), R)>> + Send,
537        KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
538    >(
539        stream: VnodeStreamType<RowSt, KeyedRowSt>,
540    ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
541    where
542        KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
543    {
544        #[auto_enums::auto_enum(futures03::Stream)]
545        match stream {
546            VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
547            VnodeStreamType::Unordered(streams) => futures::stream::iter(
548                streams
549                    .into_iter()
550                    .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
551            )
552            .flatten_unordered(1024),
553            VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
554                Box::pin(stream.map_ok(|(key, row)| KeyedRow {
555                    vnode_prefixed_key: TableKey(key),
556                    row,
557                }))
558            }))
559            .map_ok(|keyed_row| keyed_row.row),
560        }
561    }
562}
563
564use merge_vnode_stream::*;
565
566async fn build_vnode_stream<
567    R: Send,
568    RowSt: Stream<Item = StorageResult<((), R)>> + Send,
569    KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
570    RowStFut: Future<Output = StorageResult<RowSt>>,
571    KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
572>(
573    row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
574    keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
575    vnodes: &[VirtualNode],
576    ordered: bool,
577) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
578where
579    KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
580{
581    let stream = match vnodes {
582        [] => unreachable!(),
583        [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
584        // Concat all iterators if not to preserve order.
585        vnodes if !ordered => VnodeStreamType::Unordered(
586            try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
587        ),
588        // Merge all iterators if to preserve order.
589        vnodes => VnodeStreamType::Ordered(
590            try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
591        ),
592    };
593    Ok(merge_stream(stream))
594}
595
596/// Iterators
597impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
598    /// Get multiple stream item `StorageResult<OwnedRow>` based on the specified vnodes of this table with
599    /// `vnode_hint`, and merge or concat them by given `ordered`.
600    async fn iter_with_encoded_key_range(
601        &self,
602        prefix_hint: Option<Bytes>,
603        (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
604        wait_epoch: HummockReadEpoch,
605        vnode_hint: Option<VirtualNode>,
606        ordered: bool,
607        prefetch_options: PrefetchOptions,
608    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
609    {
610        let vnodes = match vnode_hint {
611            // If `vnode_hint` is set, we can only access this single vnode.
612            Some(vnode) => {
613                assert!(
614                    self.distribution.vnodes().is_set(vnode.to_index()),
615                    "vnode unset: {:?}, distribution: {:?}",
616                    vnode,
617                    self.distribution
618                );
619                vec![vnode]
620            }
621            // Otherwise, we need to access all vnodes of this table.
622            None => self.distribution.vnodes().iter_vnodes().collect_vec(),
623        };
624
625        let read_snapshot = self
626            .store
627            .new_read_snapshot(
628                wait_epoch,
629                NewReadSnapshotOptions {
630                    table_id: self.table_id,
631                    table_option: self.table_option,
632                },
633            )
634            .await?;
635
636        build_vnode_stream(
637            |vnode| {
638                self.iter_vnode_with_encoded_key_range(
639                    &read_snapshot,
640                    prefix_hint.clone(),
641                    (start_bound.as_ref(), end_bound.as_ref()),
642                    vnode,
643                    prefetch_options,
644                )
645            },
646            |vnode| {
647                self.iter_vnode_with_encoded_key_range(
648                    &read_snapshot,
649                    prefix_hint.clone(),
650                    (start_bound.as_ref(), end_bound.as_ref()),
651                    vnode,
652                    prefetch_options,
653                )
654            },
655            &vnodes,
656            ordered,
657        )
658        .await
659    }
660
661    async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
662        &self,
663        read_snapshot: &S::ReadSnapshot,
664        prefix_hint: Option<Bytes>,
665        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
666        vnode: VirtualNode,
667        prefetch_options: PrefetchOptions,
668    ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
669    {
670        let (table_key_range, read_options, pk_serializer) =
671            self.vnode_read_context(prefix_hint, encoded_key_range, vnode, prefetch_options);
672
673        let iter = read_snapshot.iter(table_key_range, read_options).await?;
674        Ok(self.iter_stream_from_state_store_iter::<K, _>(iter, pk_serializer))
675    }
676
677    fn vnode_read_context(
678        &self,
679        prefix_hint: Option<Bytes>,
680        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
681        vnode: VirtualNode,
682        prefetch_options: PrefetchOptions,
683    ) -> (TableKeyRange, ReadOptions, Option<Arc<OrderedRowSerde>>) {
684        let cache_policy = match &encoded_key_range {
685            // To prevent unbounded range scan queries from polluting the block cache, use the
686            // low priority fill policy.
687            (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
688            _ => CachePolicy::Fill(Hint::Normal),
689        };
690
691        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
692        {
693            {
694                let read_options = ReadOptions {
695                    prefix_hint,
696                    prefetch_options,
697                    cache_policy,
698                };
699                let pk_serializer = match self.output_row_in_key_indices.is_empty() {
700                    true => None,
701                    false => Some(Arc::new(self.pk_serializer.clone())),
702                };
703
704                (table_key_range, read_options, pk_serializer)
705            }
706        }
707    }
708
709    fn iter_stream_from_state_store_iter<K: CopyFromSlice, SI: StateStoreIter + Send>(
710        &self,
711        iter: SI,
712        pk_serializer: Option<Arc<OrderedRowSerde>>,
713    ) -> impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, SI, S, SD> {
714        BatchTableInnerIterInner {
715            iter,
716            mapping: self.mapping.clone(),
717            epoch_idx: self.epoch_idx,
718            row_deserializer: self.row_serde.clone(),
719            pk_serializer,
720            output_indices: self.output_indices.clone(),
721            key_output_indices: self.key_output_indices.clone(),
722            value_output_indices: self.value_output_indices.clone(),
723            output_row_in_key_indices: self.output_row_in_key_indices.clone(),
724        }
725        .into_stream::<K>()
726    }
727
728    // TODO: directly use `prefixed_range`.
729    fn serialize_pk_bound(
730        &self,
731        pk_prefix: impl Row,
732        range_bound: Bound<&OwnedRow>,
733        is_start_bound: bool,
734    ) -> Bound<Bytes> {
735        match range_bound {
736            Included(k) => {
737                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
738                let key = pk_prefix.chain(k);
739                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
740                if is_start_bound {
741                    Included(serialized_key)
742                } else {
743                    // Should use excluded next key for end bound.
744                    // Otherwise keys starting with the bound is not included.
745                    end_bound_of_prefix(&serialized_key)
746                }
747            }
748            Excluded(k) => {
749                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
750                let key = pk_prefix.chain(k);
751                let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
752                if is_start_bound {
753                    // Storage doesn't support excluded begin key yet, so transform it to
754                    // included.
755                    // We always serialize a u8 for null of datum which is not equal to '\xff',
756                    // so we can assert that the next_key would never be empty.
757                    let next_serialized_key = next_key(&serialized_key);
758                    assert!(!next_serialized_key.is_empty());
759                    Included(Bytes::from(next_serialized_key))
760                } else {
761                    Excluded(serialized_key)
762                }
763            }
764            Unbounded => {
765                let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
766                let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
767                if pk_prefix.is_empty() {
768                    Unbounded
769                } else if is_start_bound {
770                    Included(serialized_pk_prefix)
771                } else {
772                    end_bound_of_prefix(&serialized_pk_prefix)
773                }
774            }
775        }
776    }
777
778    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
779    async fn iter_with_pk_bounds(
780        &self,
781        epoch: HummockReadEpoch,
782        pk_prefix: impl Row,
783        range_bounds: impl RangeBounds<OwnedRow>,
784        ordered: bool,
785        prefetch_options: PrefetchOptions,
786    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
787        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
788        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
789        assert!(pk_prefix.len() <= self.pk_indices.len());
790        let pk_prefix_indices = (0..pk_prefix.len())
791            .map(|index| self.pk_indices[index])
792            .collect_vec();
793
794        let prefix_hint = if self.read_prefix_len_hint != 0
795            && self.read_prefix_len_hint <= pk_prefix.len()
796        {
797            let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
798                start_key
799            } else {
800                unreachable!()
801            };
802            let prefix_len = self
803                .pk_serializer
804                .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
805            Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
806        } else {
807            trace!(
808                "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?}  pk_prefix_indices {:?}",
809                self.table_id, pk_prefix, pk_prefix_indices
810            );
811            None
812        };
813
814        trace!(
815            "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?}  pk_prefix_indices {:?}",
816            self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
817        );
818
819        self.iter_with_encoded_key_range(
820            prefix_hint,
821            (start_key, end_key),
822            epoch,
823            self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
824            ordered,
825            prefetch_options,
826        )
827        .await
828    }
829
830    // Construct a stream of (columns, row_count) from a row stream
831    #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
832    async fn convert_row_stream_to_array_vec_stream(
833        iter: impl Stream<Item = StorageResult<OwnedRow>>,
834        schema: Schema,
835        chunk_size: usize,
836    ) {
837        use futures::{TryStreamExt, pin_mut};
838        use risingwave_common::util::iter_util::ZipEqFast;
839
840        pin_mut!(iter);
841
842        let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
843        let mut row_count = 0;
844
845        while let Some(row) = iter.try_next().await? {
846            row_count += 1;
847            // Uses ArrayBuilderImpl instead of DataChunkBuilder here to demonstrate how to build chunk in a columnar manner
848            let builders_ref =
849                builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
850            for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
851                builder.append(datum);
852            }
853            if row_count == chunk_size {
854                let columns: Vec<_> = builders
855                    .take()
856                    .unwrap()
857                    .into_iter()
858                    .map(|builder| builder.finish().into())
859                    .collect();
860                yield (columns, row_count);
861                assert!(builders.is_none());
862                row_count = 0;
863            }
864        }
865
866        if let Some(builders) = builders {
867            assert_gt!(row_count, 0);
868            // yield the last chunk if any
869            let columns: Vec<_> = builders
870                .into_iter()
871                .map(|builder| builder.finish().into())
872                .collect();
873            yield (columns, row_count);
874        }
875    }
876
877    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
878    /// Returns a stream of chunks of columns with the provided `chunk_size`
879    async fn chunk_iter_with_pk_bounds(
880        &self,
881        epoch: HummockReadEpoch,
882        pk_prefix: impl Row,
883        range_bounds: impl RangeBounds<OwnedRow>,
884        ordered: bool,
885        chunk_size: usize,
886        prefetch_options: PrefetchOptions,
887    ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
888        let iter = self
889            .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
890            .await?;
891
892        Ok(Self::convert_row_stream_to_array_vec_stream(
893            iter,
894            self.schema.clone(),
895            chunk_size,
896        ))
897    }
898
899    /// Construct a stream item `StorageResult<OwnedRow>` for batch executors.
900    /// Differs from the streaming one, this iterator will wait for the epoch before iteration
901    pub async fn batch_iter_with_pk_bounds(
902        &self,
903        epoch: HummockReadEpoch,
904        pk_prefix: impl Row,
905        range_bounds: impl RangeBounds<OwnedRow>,
906        ordered: bool,
907        prefetch_options: PrefetchOptions,
908    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
909        self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
910            .await
911    }
912
913    // The returned iterator will iterate data from a snapshot corresponding to the given `epoch`.
914    pub async fn batch_iter(
915        &self,
916        epoch: HummockReadEpoch,
917        ordered: bool,
918        prefetch_options: PrefetchOptions,
919    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
920        self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
921            .await
922    }
923
924    fn start_bound_from_pk(&self, start_pk: Option<&OwnedRow>) -> Bound<Bytes> {
925        if let Some(start_pk) = start_pk {
926            let mut bytes = BytesMut::new();
927            self.pk_serializer.serialize(start_pk, &mut bytes);
928            let bytes = bytes.freeze();
929            Included(bytes)
930        } else {
931            Unbounded
932        }
933    }
934
935    pub async fn batch_iter_vnode(
936        &self,
937        epoch: HummockReadEpoch,
938        start_pk: Option<&OwnedRow>,
939        vnode: VirtualNode,
940        prefetch_options: PrefetchOptions,
941        rebuild_interval: Duration,
942    ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
943    {
944        assert!(
945            !rebuild_interval.is_zero(),
946            "rebuild_interval should be positive"
947        );
948        let start_bound = self.start_bound_from_pk(start_pk);
949        let snapshot = Arc::new(
950            self.store
951                .new_read_snapshot(
952                    epoch,
953                    NewReadSnapshotOptions {
954                        table_id: self.table_id,
955                        table_option: self.table_option,
956                    },
957                )
958                .await?,
959        );
960        let (table_key_range, read_options, pk_serializer) = self.vnode_read_context(
961            None,
962            (start_bound.as_ref(), Unbounded),
963            vnode,
964            prefetch_options,
965        );
966        let iter = iter_with_timeout_rebuild(
967            snapshot,
968            table_key_range,
969            self.table_id,
970            read_options,
971            rebuild_interval,
972        )
973        .await?;
974        let iter = self.iter_stream_from_state_store_iter::<(), _>(iter, pk_serializer);
975        Ok(iter.map_ok(|(_, row)| row))
976    }
977
978    pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
979        self.store
980            .next_epoch(
981                epoch,
982                NextEpochOptions {
983                    table_id: self.table_id,
984                },
985            )
986            .await
987    }
988
989    pub async fn batch_iter_vnode_log(
990        &self,
991        start_epoch: u64,
992        end_epoch: HummockReadEpoch,
993        start_pk: Option<&OwnedRow>,
994        vnode: VirtualNode,
995    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
996    {
997        let start_bound = self.start_bound_from_pk(start_pk);
998        let stream = self
999            .batch_iter_log_inner::<()>(
1000                start_epoch,
1001                end_epoch,
1002                (start_bound.as_ref(), Unbounded),
1003                vnode,
1004            )
1005            .await?;
1006        Ok(stream.map_ok(|(_, row)| row))
1007    }
1008
1009    pub async fn batch_iter_log_with_pk_bounds(
1010        &self,
1011        start_epoch: u64,
1012        end_epoch: HummockReadEpoch,
1013        ordered: bool,
1014        range_bounds: impl RangeBounds<OwnedRow>,
1015        pk_prefix: impl Row,
1016    ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
1017        let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
1018        let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
1019        let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1020        build_vnode_stream(
1021            |vnode| {
1022                self.batch_iter_log_inner(
1023                    start_epoch,
1024                    end_epoch,
1025                    (start_key.as_ref(), end_key.as_ref()),
1026                    vnode,
1027                )
1028            },
1029            |vnode| {
1030                self.batch_iter_log_inner(
1031                    start_epoch,
1032                    end_epoch,
1033                    (start_key.as_ref(), end_key.as_ref()),
1034                    vnode,
1035                )
1036            },
1037            &vnodes,
1038            ordered,
1039        )
1040        .await
1041    }
1042
1043    async fn batch_iter_log_inner<K: CopyFromSlice>(
1044        &self,
1045        start_epoch: u64,
1046        end_epoch: HummockReadEpoch,
1047        encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1048        vnode: VirtualNode,
1049    ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1050    {
1051        let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1052        let read_options = ReadLogOptions {
1053            table_id: self.table_id,
1054        };
1055        let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1056            &self.store,
1057            self.mapping.clone(),
1058            self.row_serde.clone(),
1059            table_key_range,
1060            read_options,
1061            start_epoch,
1062            end_epoch,
1063        )
1064        .await?
1065        .into_stream::<K>();
1066
1067        Ok(iter)
1068    }
1069
1070    /// Iterates on the table with the given prefix of the pk in `pk_prefix` and the range bounds.
1071    /// Returns a stream of `DataChunk` with the provided `chunk_size`
1072    pub async fn batch_chunk_iter_with_pk_bounds(
1073        &self,
1074        epoch: HummockReadEpoch,
1075        pk_prefix: impl Row,
1076        range_bounds: impl RangeBounds<OwnedRow>,
1077        ordered: bool,
1078        chunk_size: usize,
1079        prefetch_options: PrefetchOptions,
1080    ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1081        let iter = self
1082            .chunk_iter_with_pk_bounds(
1083                epoch,
1084                pk_prefix,
1085                range_bounds,
1086                ordered,
1087                chunk_size,
1088                prefetch_options,
1089            )
1090            .await?;
1091
1092        Ok(iter.map(|item| {
1093            let (columns, row_count) = item?;
1094            Ok(DataChunk::new(columns, row_count))
1095        }))
1096    }
1097}
1098
1099/// [`BatchTableInnerIterInner`] iterates on the storage table.
1100struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1101    /// An iterator that returns raw bytes from storage.
1102    iter: SI,
1103
1104    mapping: Arc<ColumnMapping>,
1105
1106    /// The index of system column `_rw_timestamp` in the output columns.
1107    epoch_idx: Option<usize>,
1108
1109    row_deserializer: Arc<SD>,
1110
1111    /// Used for serializing and deserializing the primary key.
1112    pk_serializer: Option<Arc<OrderedRowSerde>>,
1113
1114    output_indices: Vec<usize>,
1115
1116    /// the key part of `output_indices`.
1117    key_output_indices: Option<Vec<usize>>,
1118
1119    /// the value part of `output_indices`.
1120    value_output_indices: Vec<usize>,
1121
1122    /// used for deserializing key part of output row from pk.
1123    output_row_in_key_indices: Vec<usize>,
1124}
1125
1126impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1127    /// Yield a row with its primary key.
1128    #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1129    async fn into_stream<K: CopyFromSlice>(mut self) {
1130        while let Some((k, v)) = self
1131            .iter
1132            .try_next()
1133            .instrument_await("storage_table_iter_next".verbose())
1134            .await?
1135        {
1136            let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1137            let row = self.row_deserializer.deserialize(value)?;
1138            let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1139            let row = match &self.key_output_indices {
1140                Some(key_output_indices) => {
1141                    let result_row_in_key = match self.pk_serializer.clone() {
1142                        Some(pk_serializer) => {
1143                            let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1144
1145                            pk.project(&self.output_row_in_key_indices).into_owned_row()
1146                        }
1147                        None => OwnedRow::empty(),
1148                    };
1149
1150                    let mut result_row_vec = vec![];
1151                    for idx in &self.output_indices {
1152                        if let Some(epoch_idx) = self.epoch_idx
1153                            && *idx == epoch_idx
1154                        {
1155                            let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1156                            result_row_vec
1157                                .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1158                        } else if self.value_output_indices.contains(idx) {
1159                            let item_position_in_value_indices = &self
1160                                .value_output_indices
1161                                .iter()
1162                                .position(|p| idx == p)
1163                                .unwrap();
1164                            result_row_vec.push(
1165                                result_row_in_value
1166                                    .datum_at(*item_position_in_value_indices)
1167                                    .to_owned_datum(),
1168                            );
1169                        } else {
1170                            let item_position_in_pk_indices =
1171                                key_output_indices.iter().position(|p| idx == p).unwrap();
1172                            result_row_vec.push(
1173                                result_row_in_key
1174                                    .datum_at(item_position_in_pk_indices)
1175                                    .to_owned_datum(),
1176                            );
1177                        }
1178                    }
1179                    OwnedRow::new(result_row_vec)
1180                }
1181                None => match &self.epoch_idx {
1182                    Some(epoch_idx) => {
1183                        let mut result_row_vec = vec![];
1184                        for idx in &self.output_indices {
1185                            if idx == epoch_idx {
1186                                let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1187                                result_row_vec
1188                                    .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1189                            } else {
1190                                let item_position_in_value_indices = &self
1191                                    .value_output_indices
1192                                    .iter()
1193                                    .position(|p| idx == p)
1194                                    .unwrap();
1195                                result_row_vec.push(
1196                                    result_row_in_value
1197                                        .datum_at(*item_position_in_value_indices)
1198                                        .to_owned_datum(),
1199                                );
1200                            }
1201                        }
1202                        OwnedRow::new(result_row_vec)
1203                    }
1204                    None => result_row_in_value.into_owned_row(),
1205                },
1206            };
1207            yield (K::copy_from_slice(table_key.as_ref()), row);
1208        }
1209    }
1210}
1211
1212/// [`BatchTableInnerIterLogInner`] iterates on the storage table.
1213struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1214    /// An iterator that returns raw bytes from storage.
1215    iter: S::ChangeLogIter,
1216
1217    mapping: Arc<ColumnMapping>,
1218
1219    row_deserializer: Arc<SD>,
1220}
1221
1222impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1223    /// If `wait_epoch` is true, it will wait for the given epoch to be committed before iteration.
1224    #[allow(clippy::too_many_arguments)]
1225    async fn new(
1226        store: &S,
1227        mapping: Arc<ColumnMapping>,
1228        row_deserializer: Arc<SD>,
1229        table_key_range: TableKeyRange,
1230        read_options: ReadLogOptions,
1231        start_epoch: u64,
1232        end_epoch: HummockReadEpoch,
1233    ) -> StorageResult<Self> {
1234        store
1235            .try_wait_epoch(
1236                end_epoch,
1237                TryWaitEpochOptions {
1238                    table_id: read_options.table_id,
1239                },
1240            )
1241            .await?;
1242        let iter = store
1243            .iter_log(
1244                (start_epoch, end_epoch.get_epoch()),
1245                table_key_range,
1246                read_options,
1247            )
1248            .await?;
1249        let iter = Self {
1250            iter,
1251            mapping,
1252            row_deserializer,
1253        };
1254        Ok(iter)
1255    }
1256
1257    /// Yield a row with its primary key.
1258    fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1259        self.iter.into_stream(move |(table_key, value)| {
1260            value
1261                .try_map(|value| {
1262                    let full_row = self.row_deserializer.deserialize(value)?;
1263                    let row = self
1264                        .mapping
1265                        .project(OwnedRow::new(full_row))
1266                        .into_owned_row();
1267                    Ok(row)
1268                })
1269                .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1270        })
1271    }
1272}