1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::Hint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45mod vector_index_reader;
46pub use vector_index_reader::VectorIndexReader;
47
48use crate::StateStore;
49use crate::error::{StorageError, StorageResult};
50use crate::hummock::CachePolicy;
51use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
52use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
53use crate::row_serde::{ColumnMapping, find_columns_by_ids};
54use crate::store::{
55 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
56 StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
57};
58use crate::table::merge_sort::NodePeek;
59use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
60
61#[derive(Clone)]
64pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
65 table_id: TableId,
67
68 store: S,
70
71 schema: Schema,
74
75 pk_serializer: OrderedRowSerde,
77
78 output_indices: Vec<usize>,
79
80 key_output_indices: Option<Vec<usize>>,
82
83 value_output_indices: Vec<usize>,
85
86 output_row_in_key_indices: Vec<usize>,
88
89 mapping: Arc<ColumnMapping>,
91
92 epoch_idx: Option<usize>,
94
95 row_serde: Arc<SD>,
98
99 pk_indices: Vec<usize>,
103
104 distribution: TableDistribution,
105
106 table_option: TableOption,
108
109 read_prefix_len_hint: usize,
110}
111
112pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
115
116impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
117 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118 f.debug_struct("BatchTableInner").finish_non_exhaustive()
119 }
120}
121
122impl<S: StateStore> BatchTableInner<S, EitherSerde> {
124 pub fn new_partial(
135 store: S,
136 output_column_ids: Vec<ColumnId>,
137 vnodes: Option<Arc<Bitmap>>,
138 table_desc: &StorageTableDesc,
139 ) -> Self {
140 let table_id = table_desc.table_id;
141 let column_descs = table_desc
142 .columns
143 .iter()
144 .map(ColumnDesc::from)
145 .collect_vec();
146 let order_types: Vec<OrderType> = table_desc
147 .pk
148 .iter()
149 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150 .collect();
151
152 let pk_indices = table_desc
153 .pk
154 .iter()
155 .map(|k| k.column_index as usize)
156 .collect_vec();
157
158 let table_option = TableOption {
159 retention_seconds: table_desc.retention_seconds,
160 };
161 let value_indices = table_desc
162 .get_value_indices()
163 .iter()
164 .map(|&k| k as usize)
165 .collect_vec();
166 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167 let versioned = table_desc.versioned;
168 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170 Self::new_inner(
171 store,
172 table_id,
173 column_descs,
174 output_column_ids,
175 order_types,
176 pk_indices,
177 distribution,
178 table_option,
179 value_indices,
180 prefix_hint_len,
181 versioned,
182 )
183 }
184
185 pub fn for_test_with_partial_columns(
186 store: S,
187 table_id: TableId,
188 columns: Vec<ColumnDesc>,
189 output_column_ids: Vec<ColumnId>,
190 order_types: Vec<OrderType>,
191 pk_indices: Vec<usize>,
192 value_indices: Vec<usize>,
193 ) -> Self {
194 Self::new_inner(
195 store,
196 table_id,
197 columns,
198 output_column_ids,
199 order_types,
200 pk_indices,
201 TableDistribution::singleton(),
202 Default::default(),
203 value_indices,
204 0,
205 false,
206 )
207 }
208
209 pub fn for_test(
210 store: S,
211 table_id: TableId,
212 columns: Vec<ColumnDesc>,
213 order_types: Vec<OrderType>,
214 pk_indices: Vec<usize>,
215 value_indices: Vec<usize>,
216 ) -> Self {
217 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218 Self::for_test_with_partial_columns(
219 store,
220 table_id,
221 columns,
222 output_column_ids,
223 order_types,
224 pk_indices,
225 value_indices,
226 )
227 }
228
229 #[allow(clippy::too_many_arguments)]
230 fn new_inner(
231 store: S,
232 table_id: TableId,
233 table_columns: Vec<ColumnDesc>,
234 output_column_ids: Vec<ColumnId>,
235 order_types: Vec<OrderType>,
236 pk_indices: Vec<usize>,
237 distribution: TableDistribution,
238 table_option: TableOption,
239 value_indices: Vec<usize>,
240 read_prefix_len_hint: usize,
241 versioned: bool,
242 ) -> Self {
243 assert_eq!(order_types.len(), pk_indices.len());
244
245 let (output_columns, output_indices) =
246 find_columns_by_ids(&table_columns, &output_column_ids);
247
248 let mut value_output_indices = vec![];
249 let mut key_output_indices = vec![];
250 let mut epoch_idx = None;
252
253 for idx in &output_indices {
254 if value_indices.contains(idx) {
255 value_output_indices.push(*idx);
256 } else if pk_indices.contains(idx) {
257 key_output_indices.push(*idx);
258 } else {
259 assert!(epoch_idx.is_none());
260 epoch_idx = Some(*idx);
261 }
262 }
263
264 let output_row_in_key_indices = key_output_indices
265 .iter()
266 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267 .collect_vec();
268 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270 let pk_data_types = pk_indices
271 .iter()
272 .map(|i| table_columns[*i].data_type.clone())
273 .collect();
274 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275 let (row_serde, mapping) = {
276 if versioned {
277 let value_output_indices_dedup = value_output_indices
278 .iter()
279 .unique()
280 .copied()
281 .collect::<Vec<_>>();
282 let output_row_in_value_output_indices_dedup = value_output_indices
283 .iter()
284 .map(|&di| {
285 value_output_indices_dedup
286 .iter()
287 .position(|&pi| di == pi)
288 .unwrap()
289 })
290 .collect_vec();
291 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292 let serde =
293 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294 (serde.into(), mapping)
295 } else {
296 let output_row_in_value_indices = value_output_indices
297 .iter()
298 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299 .collect_vec();
300 let mapping = ColumnMapping::new(output_row_in_value_indices);
301 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302 (serde.into(), mapping)
303 }
304 };
305
306 let key_output_indices = match key_output_indices.is_empty() {
307 true => None,
308 false => Some(key_output_indices),
309 };
310 Self {
311 table_id,
312 store,
313 schema,
314 pk_serializer,
315 output_indices,
316 key_output_indices,
317 value_output_indices,
318 output_row_in_key_indices,
319 mapping: Arc::new(mapping),
320 epoch_idx,
321 row_serde: Arc::new(row_serde),
322 pk_indices,
323 distribution,
324 table_option,
325 read_prefix_len_hint,
326 }
327 }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331 pub fn pk_serializer(&self) -> &OrderedRowSerde {
332 &self.pk_serializer
333 }
334
335 pub fn schema(&self) -> &Schema {
336 &self.schema
337 }
338
339 pub fn pk_indices(&self) -> &[usize] {
340 &self.pk_indices
341 }
342
343 pub fn output_indices(&self) -> &[usize] {
344 &self.output_indices
345 }
346
347 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351 self.pk_indices
352 .iter()
353 .map(|&i| self.output_indices.iter().position(|&j| i == j))
354 .collect()
355 }
356
357 pub fn table_id(&self) -> TableId {
358 self.table_id
359 }
360
361 pub fn vnodes(&self) -> &Arc<Bitmap> {
362 self.distribution.vnodes()
363 }
364}
365impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367 pub async fn get_row(
369 &self,
370 pk: impl Row,
371 wait_epoch: HummockReadEpoch,
372 ) -> StorageResult<Option<OwnedRow>> {
373 self.store
374 .try_wait_epoch(
375 wait_epoch,
376 TryWaitEpochOptions {
377 table_id: self.table_id,
378 },
379 )
380 .await?;
381 let serialized_pk = serialize_pk_with_vnode(
382 &pk,
383 &self.pk_serializer,
384 self.distribution.compute_vnode_by_pk(&pk),
385 );
386 assert!(pk.len() <= self.pk_indices.len());
387
388 let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
389 {
390 Some(serialized_pk.slice(VirtualNode::SIZE..))
391 } else {
392 None
393 };
394
395 let read_options = ReadOptions {
396 prefix_hint,
397 retention_seconds: self.table_option.retention_seconds,
398 cache_policy: CachePolicy::Fill(Hint::Normal),
399 ..Default::default()
400 };
401 let read_snapshot = self
402 .store
403 .new_read_snapshot(
404 wait_epoch,
405 NewReadSnapshotOptions {
406 table_id: self.table_id,
407 },
408 )
409 .await?;
410 match read_snapshot
411 .on_key_value(serialized_pk, read_options, move |key, value| {
412 let row = self.row_serde.deserialize(value)?;
413 Ok((key.epoch_with_gap.pure_epoch(), row))
414 })
415 .await?
416 {
417 Some((epoch, row)) => {
418 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
419
420 match &self.key_output_indices {
421 Some(key_output_indices) => {
422 let result_row_in_key =
423 pk.project(&self.output_row_in_key_indices).into_owned_row();
424 let mut result_row_vec = vec![];
425 for idx in &self.output_indices {
426 if let Some(epoch_idx) = self.epoch_idx
427 && *idx == epoch_idx
428 {
429 let epoch = Epoch::from(epoch);
430 result_row_vec
431 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
432 } else if self.value_output_indices.contains(idx) {
433 let item_position_in_value_indices = &self
434 .value_output_indices
435 .iter()
436 .position(|p| idx == p)
437 .unwrap();
438 result_row_vec.push(
439 result_row_in_value
440 .datum_at(*item_position_in_value_indices)
441 .to_owned_datum(),
442 );
443 } else {
444 let item_position_in_pk_indices =
445 key_output_indices.iter().position(|p| idx == p).unwrap();
446 result_row_vec.push(
447 result_row_in_key
448 .datum_at(item_position_in_pk_indices)
449 .to_owned_datum(),
450 );
451 }
452 }
453 let result_row = OwnedRow::new(result_row_vec);
454 Ok(Some(result_row))
455 }
456 None => match &self.epoch_idx {
457 Some(epoch_idx) => {
458 let mut result_row_vec = vec![];
459 for idx in &self.output_indices {
460 if idx == epoch_idx {
461 let epoch = Epoch::from(epoch);
462 result_row_vec.push(risingwave_common::types::Datum::from(
463 epoch.as_scalar(),
464 ));
465 } else {
466 let item_position_in_value_indices = &self
467 .value_output_indices
468 .iter()
469 .position(|p| idx == p)
470 .unwrap();
471 result_row_vec.push(
472 result_row_in_value
473 .datum_at(*item_position_in_value_indices)
474 .to_owned_datum(),
475 );
476 }
477 }
478 let result_row = OwnedRow::new(result_row_vec);
479 Ok(Some(result_row))
480 }
481 None => Ok(Some(result_row_in_value.into_owned_row())),
482 },
483 }
484 }
485 _ => Ok(None),
486 }
487 }
488
489 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
491 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
492 self.distribution.update_vnode_bitmap(new_vnodes)
493 }
494}
495
496impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
499 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
500 self.next().await.transpose()
501 }
502}
503
504mod merge_vnode_stream {
505
506 use bytes::Bytes;
507 use futures::{Stream, StreamExt, TryStreamExt};
508 use risingwave_hummock_sdk::key::TableKey;
509
510 use crate::error::StorageResult;
511 use crate::table::KeyedRow;
512 use crate::table::merge_sort::{NodePeek, merge_sort};
513
514 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
515 Single(RowSt),
516 Unordered(Vec<RowSt>),
517 Ordered(Vec<KeyedRowSt>),
518 }
519
520 pub(super) type MergedVnodeStream<
521 R: Send,
522 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
523 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
524 >
525 where
526 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
527 = impl Stream<Item = StorageResult<R>> + Send;
528
529 pub(super) type SortKeyType = Bytes; #[define_opaque(MergedVnodeStream)]
532 pub(super) fn merge_stream<
533 R: Send,
534 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
535 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
536 >(
537 stream: VnodeStreamType<RowSt, KeyedRowSt>,
538 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
539 where
540 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
541 {
542 #[auto_enums::auto_enum(futures03::Stream)]
543 match stream {
544 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
545 VnodeStreamType::Unordered(streams) => futures::stream::iter(
546 streams
547 .into_iter()
548 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
549 )
550 .flatten_unordered(1024),
551 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
552 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
553 vnode_prefixed_key: TableKey(key),
554 row,
555 }))
556 }))
557 .map_ok(|keyed_row| keyed_row.row),
558 }
559 }
560}
561
562use merge_vnode_stream::*;
563
564async fn build_vnode_stream<
565 R: Send,
566 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
567 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
568 RowStFut: Future<Output = StorageResult<RowSt>>,
569 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
570>(
571 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
572 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
573 vnodes: &[VirtualNode],
574 ordered: bool,
575) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
576where
577 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
578{
579 let stream = match vnodes {
580 [] => unreachable!(),
581 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
582 vnodes if !ordered => VnodeStreamType::Unordered(
584 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
585 ),
586 vnodes => VnodeStreamType::Ordered(
588 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
589 ),
590 };
591 Ok(merge_stream(stream))
592}
593
594impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
596 async fn iter_with_encoded_key_range(
599 &self,
600 prefix_hint: Option<Bytes>,
601 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
602 wait_epoch: HummockReadEpoch,
603 vnode_hint: Option<VirtualNode>,
604 ordered: bool,
605 prefetch_options: PrefetchOptions,
606 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
607 {
608 let vnodes = match vnode_hint {
609 Some(vnode) => {
611 assert!(
612 self.distribution.vnodes().is_set(vnode.to_index()),
613 "vnode unset: {:?}, distribution: {:?}",
614 vnode,
615 self.distribution
616 );
617 vec![vnode]
618 }
619 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
621 };
622
623 let read_snapshot = self
624 .store
625 .new_read_snapshot(
626 wait_epoch,
627 NewReadSnapshotOptions {
628 table_id: self.table_id,
629 },
630 )
631 .await?;
632
633 build_vnode_stream(
634 |vnode| {
635 self.iter_vnode_with_encoded_key_range(
636 &read_snapshot,
637 prefix_hint.clone(),
638 (start_bound.as_ref(), end_bound.as_ref()),
639 vnode,
640 prefetch_options,
641 )
642 },
643 |vnode| {
644 self.iter_vnode_with_encoded_key_range(
645 &read_snapshot,
646 prefix_hint.clone(),
647 (start_bound.as_ref(), end_bound.as_ref()),
648 vnode,
649 prefetch_options,
650 )
651 },
652 &vnodes,
653 ordered,
654 )
655 .await
656 }
657
658 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
659 &self,
660 read_snapshot: &S::ReadSnapshot,
661 prefix_hint: Option<Bytes>,
662 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
663 vnode: VirtualNode,
664 prefetch_options: PrefetchOptions,
665 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
666 {
667 let cache_policy = match &encoded_key_range {
668 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
671 _ => CachePolicy::Fill(Hint::Normal),
672 };
673
674 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
675
676 {
677 let prefix_hint = prefix_hint.clone();
678 {
679 let read_options = ReadOptions {
680 prefix_hint,
681 retention_seconds: self.table_option.retention_seconds,
682 prefetch_options,
683 cache_policy,
684 };
685 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
686 true => None,
687 false => Some(Arc::new(self.pk_serializer.clone())),
688 };
689 let iter = BatchTableInnerIterInner::new(
690 read_snapshot,
691 self.mapping.clone(),
692 self.epoch_idx,
693 pk_serializer,
694 self.output_indices.clone(),
695 self.key_output_indices.clone(),
696 self.value_output_indices.clone(),
697 self.output_row_in_key_indices.clone(),
698 self.row_serde.clone(),
699 table_key_range,
700 read_options,
701 )
702 .await?
703 .into_stream::<K>();
704 Ok(iter)
705 }
706 }
707 }
708
709 fn serialize_pk_bound(
711 &self,
712 pk_prefix: impl Row,
713 range_bound: Bound<&OwnedRow>,
714 is_start_bound: bool,
715 ) -> Bound<Bytes> {
716 match range_bound {
717 Included(k) => {
718 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
719 let key = pk_prefix.chain(k);
720 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
721 if is_start_bound {
722 Included(serialized_key)
723 } else {
724 end_bound_of_prefix(&serialized_key)
727 }
728 }
729 Excluded(k) => {
730 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
731 let key = pk_prefix.chain(k);
732 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
733 if is_start_bound {
734 let next_serialized_key = next_key(&serialized_key);
739 assert!(!next_serialized_key.is_empty());
740 Included(Bytes::from(next_serialized_key))
741 } else {
742 Excluded(serialized_key)
743 }
744 }
745 Unbounded => {
746 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
747 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
748 if pk_prefix.is_empty() {
749 Unbounded
750 } else if is_start_bound {
751 Included(serialized_pk_prefix)
752 } else {
753 end_bound_of_prefix(&serialized_pk_prefix)
754 }
755 }
756 }
757 }
758
759 async fn iter_with_pk_bounds(
761 &self,
762 epoch: HummockReadEpoch,
763 pk_prefix: impl Row,
764 range_bounds: impl RangeBounds<OwnedRow>,
765 ordered: bool,
766 prefetch_options: PrefetchOptions,
767 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
768 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
769 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
770 assert!(pk_prefix.len() <= self.pk_indices.len());
771 let pk_prefix_indices = (0..pk_prefix.len())
772 .map(|index| self.pk_indices[index])
773 .collect_vec();
774
775 let prefix_hint = if self.read_prefix_len_hint != 0
776 && self.read_prefix_len_hint <= pk_prefix.len()
777 {
778 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
779 start_key
780 } else {
781 unreachable!()
782 };
783 let prefix_len = self
784 .pk_serializer
785 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
786 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
787 } else {
788 trace!(
789 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
790 self.table_id, pk_prefix, pk_prefix_indices
791 );
792 None
793 };
794
795 trace!(
796 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
797 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
798 );
799
800 self.iter_with_encoded_key_range(
801 prefix_hint,
802 (start_key, end_key),
803 epoch,
804 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
805 ordered,
806 prefetch_options,
807 )
808 .await
809 }
810
811 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
813 async fn convert_row_stream_to_array_vec_stream(
814 iter: impl Stream<Item = StorageResult<OwnedRow>>,
815 schema: Schema,
816 chunk_size: usize,
817 ) {
818 use futures::{TryStreamExt, pin_mut};
819 use risingwave_common::util::iter_util::ZipEqFast;
820
821 pin_mut!(iter);
822
823 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
824 let mut row_count = 0;
825
826 while let Some(row) = iter.try_next().await? {
827 row_count += 1;
828 let builders_ref =
830 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
831 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
832 builder.append(datum);
833 }
834 if row_count == chunk_size {
835 let columns: Vec<_> = builders
836 .take()
837 .unwrap()
838 .into_iter()
839 .map(|builder| builder.finish().into())
840 .collect();
841 yield (columns, row_count);
842 assert!(builders.is_none());
843 row_count = 0;
844 }
845 }
846
847 if let Some(builders) = builders {
848 assert_gt!(row_count, 0);
849 let columns: Vec<_> = builders
851 .into_iter()
852 .map(|builder| builder.finish().into())
853 .collect();
854 yield (columns, row_count);
855 }
856 }
857
858 async fn chunk_iter_with_pk_bounds(
861 &self,
862 epoch: HummockReadEpoch,
863 pk_prefix: impl Row,
864 range_bounds: impl RangeBounds<OwnedRow>,
865 ordered: bool,
866 chunk_size: usize,
867 prefetch_options: PrefetchOptions,
868 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
869 let iter = self
870 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
871 .await?;
872
873 Ok(Self::convert_row_stream_to_array_vec_stream(
874 iter,
875 self.schema.clone(),
876 chunk_size,
877 ))
878 }
879
880 pub async fn batch_iter_with_pk_bounds(
883 &self,
884 epoch: HummockReadEpoch,
885 pk_prefix: impl Row,
886 range_bounds: impl RangeBounds<OwnedRow>,
887 ordered: bool,
888 prefetch_options: PrefetchOptions,
889 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
890 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
891 .await
892 }
893
894 pub async fn batch_iter(
896 &self,
897 epoch: HummockReadEpoch,
898 ordered: bool,
899 prefetch_options: PrefetchOptions,
900 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
901 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
902 .await
903 }
904
905 pub async fn batch_iter_vnode(
906 &self,
907 epoch: HummockReadEpoch,
908 start_pk: Option<&OwnedRow>,
909 vnode: VirtualNode,
910 prefetch_options: PrefetchOptions,
911 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
912 {
913 let start_bound = if let Some(start_pk) = start_pk {
914 let mut bytes = BytesMut::new();
915 self.pk_serializer.serialize(start_pk, &mut bytes);
916 let bytes = bytes.freeze();
917 Included(bytes)
918 } else {
919 Unbounded
920 };
921 let read_snapshot = self
922 .store
923 .new_read_snapshot(
924 epoch,
925 NewReadSnapshotOptions {
926 table_id: self.table_id,
927 },
928 )
929 .await?;
930 Ok(self
931 .iter_vnode_with_encoded_key_range::<()>(
932 &read_snapshot,
933 None,
934 (start_bound.as_ref(), Unbounded),
935 vnode,
936 prefetch_options,
937 )
938 .await?
939 .map_ok(|(_, row)| row))
940 }
941
942 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
943 self.store
944 .next_epoch(
945 epoch,
946 NextEpochOptions {
947 table_id: self.table_id,
948 },
949 )
950 .await
951 }
952
953 pub async fn batch_iter_vnode_log(
954 &self,
955 start_epoch: u64,
956 end_epoch: HummockReadEpoch,
957 start_pk: Option<&OwnedRow>,
958 vnode: VirtualNode,
959 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
960 {
961 let start_bound = if let Some(start_pk) = start_pk {
962 let mut bytes = BytesMut::new();
963 self.pk_serializer.serialize(start_pk, &mut bytes);
964 let bytes = bytes.freeze();
965 Included(bytes)
966 } else {
967 Unbounded
968 };
969 let stream = self
970 .batch_iter_log_inner::<()>(
971 start_epoch,
972 end_epoch,
973 (start_bound.as_ref(), Unbounded),
974 vnode,
975 )
976 .await?;
977 Ok(stream.map_ok(|(_, row)| row))
978 }
979
980 pub async fn batch_iter_log_with_pk_bounds(
981 &self,
982 start_epoch: u64,
983 end_epoch: HummockReadEpoch,
984 ordered: bool,
985 range_bounds: impl RangeBounds<OwnedRow>,
986 pk_prefix: impl Row,
987 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
988 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
989 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
990 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
991 build_vnode_stream(
992 |vnode| {
993 self.batch_iter_log_inner(
994 start_epoch,
995 end_epoch,
996 (start_key.as_ref(), end_key.as_ref()),
997 vnode,
998 )
999 },
1000 |vnode| {
1001 self.batch_iter_log_inner(
1002 start_epoch,
1003 end_epoch,
1004 (start_key.as_ref(), end_key.as_ref()),
1005 vnode,
1006 )
1007 },
1008 &vnodes,
1009 ordered,
1010 )
1011 .await
1012 }
1013
1014 async fn batch_iter_log_inner<K: CopyFromSlice>(
1015 &self,
1016 start_epoch: u64,
1017 end_epoch: HummockReadEpoch,
1018 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1019 vnode: VirtualNode,
1020 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1021 {
1022 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1023 let read_options = ReadLogOptions {
1024 table_id: self.table_id,
1025 };
1026 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1027 &self.store,
1028 self.mapping.clone(),
1029 self.row_serde.clone(),
1030 table_key_range,
1031 read_options,
1032 start_epoch,
1033 end_epoch,
1034 )
1035 .await?
1036 .into_stream::<K>();
1037
1038 Ok(iter)
1039 }
1040
1041 pub async fn batch_chunk_iter_with_pk_bounds(
1044 &self,
1045 epoch: HummockReadEpoch,
1046 pk_prefix: impl Row,
1047 range_bounds: impl RangeBounds<OwnedRow>,
1048 ordered: bool,
1049 chunk_size: usize,
1050 prefetch_options: PrefetchOptions,
1051 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1052 let iter = self
1053 .chunk_iter_with_pk_bounds(
1054 epoch,
1055 pk_prefix,
1056 range_bounds,
1057 ordered,
1058 chunk_size,
1059 prefetch_options,
1060 )
1061 .await?;
1062
1063 Ok(iter.map(|item| {
1064 let (columns, row_count) = item?;
1065 Ok(DataChunk::new(columns, row_count))
1066 }))
1067 }
1068}
1069
1070struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1072 iter: SI,
1074
1075 mapping: Arc<ColumnMapping>,
1076
1077 epoch_idx: Option<usize>,
1079
1080 row_deserializer: Arc<SD>,
1081
1082 pk_serializer: Option<Arc<OrderedRowSerde>>,
1084
1085 output_indices: Vec<usize>,
1086
1087 key_output_indices: Option<Vec<usize>>,
1089
1090 value_output_indices: Vec<usize>,
1092
1093 output_row_in_key_indices: Vec<usize>,
1095}
1096
1097impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1098 #[allow(clippy::too_many_arguments)]
1100 async fn new<S>(
1101 store: &S,
1102 mapping: Arc<ColumnMapping>,
1103 epoch_idx: Option<usize>,
1104 pk_serializer: Option<Arc<OrderedRowSerde>>,
1105 output_indices: Vec<usize>,
1106 key_output_indices: Option<Vec<usize>>,
1107 value_output_indices: Vec<usize>,
1108 output_row_in_key_indices: Vec<usize>,
1109 row_deserializer: Arc<SD>,
1110 table_key_range: TableKeyRange,
1111 read_options: ReadOptions,
1112 ) -> StorageResult<Self>
1113 where
1114 S: StateStoreRead<Iter = SI>,
1115 {
1116 let iter = store.iter(table_key_range, read_options).await?;
1117 let iter = Self {
1118 iter,
1119 mapping,
1120 epoch_idx,
1121 row_deserializer,
1122 pk_serializer,
1123 output_indices,
1124 key_output_indices,
1125 value_output_indices,
1126 output_row_in_key_indices,
1127 };
1128 Ok(iter)
1129 }
1130
1131 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1133 async fn into_stream<K: CopyFromSlice>(mut self) {
1134 while let Some((k, v)) = self
1135 .iter
1136 .try_next()
1137 .instrument_await("storage_table_iter_next".verbose())
1138 .await?
1139 {
1140 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1141 let row = self.row_deserializer.deserialize(value)?;
1142 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1143 let row = match &self.key_output_indices {
1144 Some(key_output_indices) => {
1145 let result_row_in_key = match self.pk_serializer.clone() {
1146 Some(pk_serializer) => {
1147 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1148
1149 pk.project(&self.output_row_in_key_indices).into_owned_row()
1150 }
1151 None => OwnedRow::empty(),
1152 };
1153
1154 let mut result_row_vec = vec![];
1155 for idx in &self.output_indices {
1156 if let Some(epoch_idx) = self.epoch_idx
1157 && *idx == epoch_idx
1158 {
1159 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1160 result_row_vec
1161 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1162 } else if self.value_output_indices.contains(idx) {
1163 let item_position_in_value_indices = &self
1164 .value_output_indices
1165 .iter()
1166 .position(|p| idx == p)
1167 .unwrap();
1168 result_row_vec.push(
1169 result_row_in_value
1170 .datum_at(*item_position_in_value_indices)
1171 .to_owned_datum(),
1172 );
1173 } else {
1174 let item_position_in_pk_indices =
1175 key_output_indices.iter().position(|p| idx == p).unwrap();
1176 result_row_vec.push(
1177 result_row_in_key
1178 .datum_at(item_position_in_pk_indices)
1179 .to_owned_datum(),
1180 );
1181 }
1182 }
1183 OwnedRow::new(result_row_vec)
1184 }
1185 None => match &self.epoch_idx {
1186 Some(epoch_idx) => {
1187 let mut result_row_vec = vec![];
1188 for idx in &self.output_indices {
1189 if idx == epoch_idx {
1190 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1191 result_row_vec
1192 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1193 } else {
1194 let item_position_in_value_indices = &self
1195 .value_output_indices
1196 .iter()
1197 .position(|p| idx == p)
1198 .unwrap();
1199 result_row_vec.push(
1200 result_row_in_value
1201 .datum_at(*item_position_in_value_indices)
1202 .to_owned_datum(),
1203 );
1204 }
1205 }
1206 OwnedRow::new(result_row_vec)
1207 }
1208 None => result_row_in_value.into_owned_row(),
1209 },
1210 };
1211 yield (K::copy_from_slice(table_key.as_ref()), row);
1212 }
1213 }
1214}
1215
1216struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1218 iter: S::ChangeLogIter,
1220
1221 mapping: Arc<ColumnMapping>,
1222
1223 row_deserializer: Arc<SD>,
1224}
1225
1226impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1227 #[allow(clippy::too_many_arguments)]
1229 async fn new(
1230 store: &S,
1231 mapping: Arc<ColumnMapping>,
1232 row_deserializer: Arc<SD>,
1233 table_key_range: TableKeyRange,
1234 read_options: ReadLogOptions,
1235 start_epoch: u64,
1236 end_epoch: HummockReadEpoch,
1237 ) -> StorageResult<Self> {
1238 store
1239 .try_wait_epoch(
1240 end_epoch,
1241 TryWaitEpochOptions {
1242 table_id: read_options.table_id,
1243 },
1244 )
1245 .await?;
1246 let iter = store
1247 .iter_log(
1248 (start_epoch, end_epoch.get_epoch()),
1249 table_key_range,
1250 read_options,
1251 )
1252 .await?;
1253 let iter = Self {
1254 iter,
1255 mapping,
1256 row_deserializer,
1257 };
1258 Ok(iter)
1259 }
1260
1261 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1263 self.iter.into_stream(move |(table_key, value)| {
1264 value
1265 .try_map(|value| {
1266 let full_row = self.row_deserializer.deserialize(value)?;
1267 let row = self
1268 .mapping
1269 .project(OwnedRow::new(full_row))
1270 .into_owned_row();
1271 Ok(row)
1272 })
1273 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1274 })
1275 }
1276}