1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::CacheHint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45
46use crate::StateStore;
47use crate::error::{StorageError, StorageResult};
48use crate::hummock::CachePolicy;
49use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
50use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
51use crate::row_serde::{ColumnMapping, find_columns_by_ids};
52use crate::store::{
53 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
54 StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
55};
56use crate::table::merge_sort::NodePeek;
57use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
58
59#[derive(Clone)]
62pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
63 table_id: TableId,
65
66 store: S,
68
69 schema: Schema,
72
73 pk_serializer: OrderedRowSerde,
75
76 output_indices: Vec<usize>,
77
78 key_output_indices: Option<Vec<usize>>,
80
81 value_output_indices: Vec<usize>,
83
84 output_row_in_key_indices: Vec<usize>,
86
87 mapping: Arc<ColumnMapping>,
89
90 epoch_idx: Option<usize>,
92
93 row_serde: Arc<SD>,
96
97 pk_indices: Vec<usize>,
101
102 distribution: TableDistribution,
103
104 table_option: TableOption,
106
107 read_prefix_len_hint: usize,
108}
109
110pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
113
114impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("BatchTableInner").finish_non_exhaustive()
117 }
118}
119
120impl<S: StateStore> BatchTableInner<S, EitherSerde> {
122 pub fn new_partial(
133 store: S,
134 output_column_ids: Vec<ColumnId>,
135 vnodes: Option<Arc<Bitmap>>,
136 table_desc: &StorageTableDesc,
137 ) -> Self {
138 let table_id = TableId {
139 table_id: table_desc.table_id,
140 };
141 let column_descs = table_desc
142 .columns
143 .iter()
144 .map(ColumnDesc::from)
145 .collect_vec();
146 let order_types: Vec<OrderType> = table_desc
147 .pk
148 .iter()
149 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150 .collect();
151
152 let pk_indices = table_desc
153 .pk
154 .iter()
155 .map(|k| k.column_index as usize)
156 .collect_vec();
157
158 let table_option = TableOption {
159 retention_seconds: table_desc.retention_seconds,
160 };
161 let value_indices = table_desc
162 .get_value_indices()
163 .iter()
164 .map(|&k| k as usize)
165 .collect_vec();
166 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167 let versioned = table_desc.versioned;
168 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170 Self::new_inner(
171 store,
172 table_id,
173 column_descs,
174 output_column_ids,
175 order_types,
176 pk_indices,
177 distribution,
178 table_option,
179 value_indices,
180 prefix_hint_len,
181 versioned,
182 )
183 }
184
185 pub fn for_test_with_partial_columns(
186 store: S,
187 table_id: TableId,
188 columns: Vec<ColumnDesc>,
189 output_column_ids: Vec<ColumnId>,
190 order_types: Vec<OrderType>,
191 pk_indices: Vec<usize>,
192 value_indices: Vec<usize>,
193 ) -> Self {
194 Self::new_inner(
195 store,
196 table_id,
197 columns,
198 output_column_ids,
199 order_types,
200 pk_indices,
201 TableDistribution::singleton(),
202 Default::default(),
203 value_indices,
204 0,
205 false,
206 )
207 }
208
209 pub fn for_test(
210 store: S,
211 table_id: TableId,
212 columns: Vec<ColumnDesc>,
213 order_types: Vec<OrderType>,
214 pk_indices: Vec<usize>,
215 value_indices: Vec<usize>,
216 ) -> Self {
217 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218 Self::for_test_with_partial_columns(
219 store,
220 table_id,
221 columns,
222 output_column_ids,
223 order_types,
224 pk_indices,
225 value_indices,
226 )
227 }
228
229 #[allow(clippy::too_many_arguments)]
230 fn new_inner(
231 store: S,
232 table_id: TableId,
233 table_columns: Vec<ColumnDesc>,
234 output_column_ids: Vec<ColumnId>,
235 order_types: Vec<OrderType>,
236 pk_indices: Vec<usize>,
237 distribution: TableDistribution,
238 table_option: TableOption,
239 value_indices: Vec<usize>,
240 read_prefix_len_hint: usize,
241 versioned: bool,
242 ) -> Self {
243 assert_eq!(order_types.len(), pk_indices.len());
244
245 let (output_columns, output_indices) =
246 find_columns_by_ids(&table_columns, &output_column_ids);
247
248 let mut value_output_indices = vec![];
249 let mut key_output_indices = vec![];
250 let mut epoch_idx = None;
252
253 for idx in &output_indices {
254 if value_indices.contains(idx) {
255 value_output_indices.push(*idx);
256 } else if pk_indices.contains(idx) {
257 key_output_indices.push(*idx);
258 } else {
259 assert!(epoch_idx.is_none());
260 epoch_idx = Some(*idx);
261 }
262 }
263
264 let output_row_in_key_indices = key_output_indices
265 .iter()
266 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267 .collect_vec();
268 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270 let pk_data_types = pk_indices
271 .iter()
272 .map(|i| table_columns[*i].data_type.clone())
273 .collect();
274 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275 let (row_serde, mapping) = {
276 if versioned {
277 let value_output_indices_dedup = value_output_indices
278 .iter()
279 .unique()
280 .copied()
281 .collect::<Vec<_>>();
282 let output_row_in_value_output_indices_dedup = value_output_indices
283 .iter()
284 .map(|&di| {
285 value_output_indices_dedup
286 .iter()
287 .position(|&pi| di == pi)
288 .unwrap()
289 })
290 .collect_vec();
291 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292 let serde =
293 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294 (serde.into(), mapping)
295 } else {
296 let output_row_in_value_indices = value_output_indices
297 .iter()
298 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299 .collect_vec();
300 let mapping = ColumnMapping::new(output_row_in_value_indices);
301 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302 (serde.into(), mapping)
303 }
304 };
305
306 let key_output_indices = match key_output_indices.is_empty() {
307 true => None,
308 false => Some(key_output_indices),
309 };
310 Self {
311 table_id,
312 store,
313 schema,
314 pk_serializer,
315 output_indices,
316 key_output_indices,
317 value_output_indices,
318 output_row_in_key_indices,
319 mapping: Arc::new(mapping),
320 epoch_idx,
321 row_serde: Arc::new(row_serde),
322 pk_indices,
323 distribution,
324 table_option,
325 read_prefix_len_hint,
326 }
327 }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331 pub fn pk_serializer(&self) -> &OrderedRowSerde {
332 &self.pk_serializer
333 }
334
335 pub fn schema(&self) -> &Schema {
336 &self.schema
337 }
338
339 pub fn pk_indices(&self) -> &[usize] {
340 &self.pk_indices
341 }
342
343 pub fn output_indices(&self) -> &[usize] {
344 &self.output_indices
345 }
346
347 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351 self.pk_indices
352 .iter()
353 .map(|&i| self.output_indices.iter().position(|&j| i == j))
354 .collect()
355 }
356
357 pub fn table_id(&self) -> TableId {
358 self.table_id
359 }
360
361 pub fn vnodes(&self) -> &Arc<Bitmap> {
362 self.distribution.vnodes()
363 }
364}
365impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367 pub async fn get_row(
369 &self,
370 pk: impl Row,
371 wait_epoch: HummockReadEpoch,
372 ) -> StorageResult<Option<OwnedRow>> {
373 let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
374 let read_committed = wait_epoch.is_read_committed();
375 self.store
376 .try_wait_epoch(
377 wait_epoch,
378 TryWaitEpochOptions {
379 table_id: self.table_id,
380 },
381 )
382 .await?;
383 let serialized_pk = serialize_pk_with_vnode(
384 &pk,
385 &self.pk_serializer,
386 self.distribution.compute_vnode_by_pk(&pk),
387 );
388 assert!(pk.len() <= self.pk_indices.len());
389
390 let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
391 {
392 Some(serialized_pk.slice(VirtualNode::SIZE..))
393 } else {
394 None
395 };
396
397 let read_options = ReadOptions {
398 prefix_hint,
399 retention_seconds: self.table_option.retention_seconds,
400 table_id: self.table_id,
401 read_version_from_backup: read_backup,
402 read_committed,
403 cache_policy: CachePolicy::Fill(CacheHint::Normal),
404 ..Default::default()
405 };
406 let read_snapshot = self
407 .store
408 .new_read_snapshot(
409 wait_epoch,
410 NewReadSnapshotOptions {
411 table_id: self.table_id,
412 },
413 )
414 .await?;
415 match read_snapshot
416 .get_keyed_row(serialized_pk, read_options)
417 .await?
418 {
419 Some((full_key, value)) => {
420 let row = self.row_serde.deserialize(&value)?;
421 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
422
423 match &self.key_output_indices {
424 Some(key_output_indices) => {
425 let result_row_in_key =
426 pk.project(&self.output_row_in_key_indices).into_owned_row();
427 let mut result_row_vec = vec![];
428 for idx in &self.output_indices {
429 if let Some(epoch_idx) = self.epoch_idx
430 && *idx == epoch_idx
431 {
432 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
433 result_row_vec
434 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
435 } else if self.value_output_indices.contains(idx) {
436 let item_position_in_value_indices = &self
437 .value_output_indices
438 .iter()
439 .position(|p| idx == p)
440 .unwrap();
441 result_row_vec.push(
442 result_row_in_value
443 .datum_at(*item_position_in_value_indices)
444 .to_owned_datum(),
445 );
446 } else {
447 let item_position_in_pk_indices =
448 key_output_indices.iter().position(|p| idx == p).unwrap();
449 result_row_vec.push(
450 result_row_in_key
451 .datum_at(item_position_in_pk_indices)
452 .to_owned_datum(),
453 );
454 }
455 }
456 let result_row = OwnedRow::new(result_row_vec);
457 Ok(Some(result_row))
458 }
459 None => match &self.epoch_idx {
460 Some(epoch_idx) => {
461 let mut result_row_vec = vec![];
462 for idx in &self.output_indices {
463 if idx == epoch_idx {
464 let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
465 result_row_vec.push(risingwave_common::types::Datum::from(
466 epoch.as_scalar(),
467 ));
468 } else {
469 let item_position_in_value_indices = &self
470 .value_output_indices
471 .iter()
472 .position(|p| idx == p)
473 .unwrap();
474 result_row_vec.push(
475 result_row_in_value
476 .datum_at(*item_position_in_value_indices)
477 .to_owned_datum(),
478 );
479 }
480 }
481 let result_row = OwnedRow::new(result_row_vec);
482 Ok(Some(result_row))
483 }
484 None => Ok(Some(result_row_in_value.into_owned_row())),
485 },
486 }
487 }
488 _ => Ok(None),
489 }
490 }
491
492 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
494 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
495 self.distribution.update_vnode_bitmap(new_vnodes)
496 }
497}
498
499impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
502 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
503 self.next().await.transpose()
504 }
505}
506
507mod merge_vnode_stream {
508
509 use bytes::Bytes;
510 use futures::{Stream, StreamExt, TryStreamExt};
511 use risingwave_hummock_sdk::key::TableKey;
512
513 use crate::error::StorageResult;
514 use crate::table::KeyedRow;
515 use crate::table::merge_sort::{NodePeek, merge_sort};
516
517 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
518 Single(RowSt),
519 Unordered(Vec<RowSt>),
520 Ordered(Vec<KeyedRowSt>),
521 }
522
523 pub(super) type MergedVnodeStream<
524 R: Send,
525 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
526 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
527 >
528 where
529 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
530 = impl Stream<Item = StorageResult<R>> + Send;
531
532 pub(super) type SortKeyType = Bytes; pub(super) fn merge_stream<
535 R: Send,
536 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
537 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
538 >(
539 stream: VnodeStreamType<RowSt, KeyedRowSt>,
540 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
541 where
542 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
543 {
544 #[auto_enums::auto_enum(futures03::Stream)]
545 match stream {
546 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
547 VnodeStreamType::Unordered(streams) => futures::stream::iter(
548 streams
549 .into_iter()
550 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
551 )
552 .flatten_unordered(1024),
553 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
554 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
555 vnode_prefixed_key: TableKey(key),
556 row,
557 }))
558 }))
559 .map_ok(|keyed_row| keyed_row.row),
560 }
561 }
562}
563
564use merge_vnode_stream::*;
565
566async fn build_vnode_stream<
567 R: Send,
568 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
569 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
570 RowStFut: Future<Output = StorageResult<RowSt>>,
571 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
572>(
573 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
574 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
575 vnodes: &[VirtualNode],
576 ordered: bool,
577) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
578where
579 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
580{
581 let stream = match vnodes {
582 [] => unreachable!(),
583 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
584 vnodes if !ordered => VnodeStreamType::Unordered(
586 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
587 ),
588 vnodes => VnodeStreamType::Ordered(
590 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
591 ),
592 };
593 Ok(merge_stream(stream))
594}
595
596impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
598 async fn iter_with_encoded_key_range(
601 &self,
602 prefix_hint: Option<Bytes>,
603 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
604 wait_epoch: HummockReadEpoch,
605 vnode_hint: Option<VirtualNode>,
606 ordered: bool,
607 prefetch_options: PrefetchOptions,
608 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
609 {
610 let vnodes = match vnode_hint {
611 Some(vnode) => {
613 assert!(
614 self.distribution.vnodes().is_set(vnode.to_index()),
615 "vnode unset: {:?}, distribution: {:?}",
616 vnode,
617 self.distribution
618 );
619 vec![vnode]
620 }
621 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
623 };
624
625 let read_snapshot = self
626 .store
627 .new_read_snapshot(
628 wait_epoch,
629 NewReadSnapshotOptions {
630 table_id: self.table_id,
631 },
632 )
633 .await?;
634
635 build_vnode_stream(
636 |vnode| {
637 self.iter_vnode_with_encoded_key_range(
638 &read_snapshot,
639 prefix_hint.clone(),
640 (start_bound.as_ref(), end_bound.as_ref()),
641 wait_epoch,
642 vnode,
643 prefetch_options,
644 )
645 },
646 |vnode| {
647 self.iter_vnode_with_encoded_key_range(
648 &read_snapshot,
649 prefix_hint.clone(),
650 (start_bound.as_ref(), end_bound.as_ref()),
651 wait_epoch,
652 vnode,
653 prefetch_options,
654 )
655 },
656 &vnodes,
657 ordered,
658 )
659 .await
660 }
661
662 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
663 &self,
664 read_snapshot: &S::ReadSnapshot,
665 prefix_hint: Option<Bytes>,
666 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
667 wait_epoch: HummockReadEpoch,
668 vnode: VirtualNode,
669 prefetch_options: PrefetchOptions,
670 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
671 {
672 let cache_policy = match &encoded_key_range {
673 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CacheHint::Low),
676 _ => CachePolicy::Fill(CacheHint::Normal),
677 };
678
679 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
680
681 {
682 let prefix_hint = prefix_hint.clone();
683 let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
684 let read_committed = wait_epoch.is_read_committed();
685 {
686 let read_options = ReadOptions {
687 prefix_hint,
688 retention_seconds: self.table_option.retention_seconds,
689 table_id: self.table_id,
690 read_version_from_backup: read_backup,
691 read_committed,
692 prefetch_options,
693 cache_policy,
694 };
695 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
696 true => None,
697 false => Some(Arc::new(self.pk_serializer.clone())),
698 };
699 let iter = BatchTableInnerIterInner::new(
700 read_snapshot,
701 self.mapping.clone(),
702 self.epoch_idx,
703 pk_serializer,
704 self.output_indices.clone(),
705 self.key_output_indices.clone(),
706 self.value_output_indices.clone(),
707 self.output_row_in_key_indices.clone(),
708 self.row_serde.clone(),
709 table_key_range,
710 read_options,
711 )
712 .await?
713 .into_stream::<K>();
714 Ok(iter)
715 }
716 }
717 }
718
719 fn serialize_pk_bound(
721 &self,
722 pk_prefix: impl Row,
723 range_bound: Bound<&OwnedRow>,
724 is_start_bound: bool,
725 ) -> Bound<Bytes> {
726 match range_bound {
727 Included(k) => {
728 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
729 let key = pk_prefix.chain(k);
730 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
731 if is_start_bound {
732 Included(serialized_key)
733 } else {
734 end_bound_of_prefix(&serialized_key)
737 }
738 }
739 Excluded(k) => {
740 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
741 let key = pk_prefix.chain(k);
742 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
743 if is_start_bound {
744 let next_serialized_key = next_key(&serialized_key);
749 assert!(!next_serialized_key.is_empty());
750 Included(Bytes::from(next_serialized_key))
751 } else {
752 Excluded(serialized_key)
753 }
754 }
755 Unbounded => {
756 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
757 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
758 if pk_prefix.is_empty() {
759 Unbounded
760 } else if is_start_bound {
761 Included(serialized_pk_prefix)
762 } else {
763 end_bound_of_prefix(&serialized_pk_prefix)
764 }
765 }
766 }
767 }
768
769 async fn iter_with_pk_bounds(
771 &self,
772 epoch: HummockReadEpoch,
773 pk_prefix: impl Row,
774 range_bounds: impl RangeBounds<OwnedRow>,
775 ordered: bool,
776 prefetch_options: PrefetchOptions,
777 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
778 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
779 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
780 assert!(pk_prefix.len() <= self.pk_indices.len());
781 let pk_prefix_indices = (0..pk_prefix.len())
782 .map(|index| self.pk_indices[index])
783 .collect_vec();
784
785 let prefix_hint = if self.read_prefix_len_hint != 0
786 && self.read_prefix_len_hint <= pk_prefix.len()
787 {
788 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
789 start_key
790 } else {
791 unreachable!()
792 };
793 let prefix_len = self
794 .pk_serializer
795 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
796 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
797 } else {
798 trace!(
799 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
800 self.table_id, pk_prefix, pk_prefix_indices
801 );
802 None
803 };
804
805 trace!(
806 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
807 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
808 );
809
810 self.iter_with_encoded_key_range(
811 prefix_hint,
812 (start_key, end_key),
813 epoch,
814 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
815 ordered,
816 prefetch_options,
817 )
818 .await
819 }
820
821 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
823 async fn convert_row_stream_to_array_vec_stream(
824 iter: impl Stream<Item = StorageResult<OwnedRow>>,
825 schema: Schema,
826 chunk_size: usize,
827 ) {
828 use futures::{TryStreamExt, pin_mut};
829 use risingwave_common::util::iter_util::ZipEqFast;
830
831 pin_mut!(iter);
832
833 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
834 let mut row_count = 0;
835
836 while let Some(row) = iter.try_next().await? {
837 row_count += 1;
838 let builders_ref =
840 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
841 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
842 builder.append(datum);
843 }
844 if row_count == chunk_size {
845 let columns: Vec<_> = builders
846 .take()
847 .unwrap()
848 .into_iter()
849 .map(|builder| builder.finish().into())
850 .collect();
851 yield (columns, row_count);
852 assert!(builders.is_none());
853 row_count = 0;
854 }
855 }
856
857 if let Some(builders) = builders {
858 assert_gt!(row_count, 0);
859 let columns: Vec<_> = builders
861 .into_iter()
862 .map(|builder| builder.finish().into())
863 .collect();
864 yield (columns, row_count);
865 }
866 }
867
868 async fn chunk_iter_with_pk_bounds(
871 &self,
872 epoch: HummockReadEpoch,
873 pk_prefix: impl Row,
874 range_bounds: impl RangeBounds<OwnedRow>,
875 ordered: bool,
876 chunk_size: usize,
877 prefetch_options: PrefetchOptions,
878 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
879 let iter = self
880 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
881 .await?;
882
883 Ok(Self::convert_row_stream_to_array_vec_stream(
884 iter,
885 self.schema.clone(),
886 chunk_size,
887 ))
888 }
889
890 pub async fn batch_iter_with_pk_bounds(
893 &self,
894 epoch: HummockReadEpoch,
895 pk_prefix: impl Row,
896 range_bounds: impl RangeBounds<OwnedRow>,
897 ordered: bool,
898 prefetch_options: PrefetchOptions,
899 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
900 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
901 .await
902 }
903
904 pub async fn batch_iter(
906 &self,
907 epoch: HummockReadEpoch,
908 ordered: bool,
909 prefetch_options: PrefetchOptions,
910 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
911 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
912 .await
913 }
914
915 pub async fn batch_iter_vnode(
916 &self,
917 epoch: HummockReadEpoch,
918 start_pk: Option<&OwnedRow>,
919 vnode: VirtualNode,
920 prefetch_options: PrefetchOptions,
921 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
922 {
923 let start_bound = if let Some(start_pk) = start_pk {
924 let mut bytes = BytesMut::new();
925 self.pk_serializer.serialize(start_pk, &mut bytes);
926 let bytes = bytes.freeze();
927 Included(bytes)
928 } else {
929 Unbounded
930 };
931 let read_snapshot = self
932 .store
933 .new_read_snapshot(
934 epoch,
935 NewReadSnapshotOptions {
936 table_id: self.table_id,
937 },
938 )
939 .await?;
940 Ok(self
941 .iter_vnode_with_encoded_key_range::<()>(
942 &read_snapshot,
943 None,
944 (start_bound.as_ref(), Unbounded),
945 epoch,
946 vnode,
947 prefetch_options,
948 )
949 .await?
950 .map_ok(|(_, row)| row))
951 }
952
953 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
954 self.store
955 .next_epoch(
956 epoch,
957 NextEpochOptions {
958 table_id: self.table_id,
959 },
960 )
961 .await
962 }
963
964 async fn batch_iter_log_inner<K: CopyFromSlice>(
965 &self,
966 start_epoch: u64,
967 end_epoch: HummockReadEpoch,
968 start_pk: Option<&OwnedRow>,
969 vnode: VirtualNode,
970 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + use<K, S, SD>> {
971 let start_bound = if let Some(start_pk) = start_pk {
972 let mut bytes = BytesMut::new();
973 self.pk_serializer.serialize(start_pk, &mut bytes);
974 let bytes = bytes.freeze();
975 Included(bytes)
976 } else {
977 Unbounded
978 };
979 let table_key_range =
980 prefixed_range_with_vnode::<&Bytes>((start_bound.as_ref(), Unbounded), vnode);
981 let read_options = ReadLogOptions {
982 table_id: self.table_id,
983 };
984 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
985 &self.store,
986 self.mapping.clone(),
987 self.row_serde.clone(),
988 table_key_range,
989 read_options,
990 start_epoch,
991 end_epoch,
992 )
993 .await?
994 .into_stream::<K>();
995
996 Ok(iter)
997 }
998
999 pub async fn batch_iter_vnode_log(
1000 &self,
1001 start_epoch: u64,
1002 end_epoch: HummockReadEpoch,
1003 start_pk: Option<&OwnedRow>,
1004 vnode: VirtualNode,
1005 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + use<S, SD>> {
1006 let stream = self
1007 .batch_iter_log_inner::<()>(start_epoch, end_epoch, start_pk, vnode)
1008 .await?;
1009 Ok(stream.map_ok(|(_, row)| row))
1010 }
1011
1012 pub async fn batch_iter_log_with_pk_bounds(
1013 &self,
1014 start_epoch: u64,
1015 end_epoch: HummockReadEpoch,
1016 ordered: bool,
1017 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
1018 {
1019 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1020 build_vnode_stream(
1021 |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
1022 |vnode| self.batch_iter_log_inner(start_epoch, end_epoch, None, vnode),
1023 &vnodes,
1024 ordered,
1025 )
1026 .await
1027 }
1028
1029 pub async fn batch_chunk_iter_with_pk_bounds(
1032 &self,
1033 epoch: HummockReadEpoch,
1034 pk_prefix: impl Row,
1035 range_bounds: impl RangeBounds<OwnedRow>,
1036 ordered: bool,
1037 chunk_size: usize,
1038 prefetch_options: PrefetchOptions,
1039 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1040 let iter = self
1041 .chunk_iter_with_pk_bounds(
1042 epoch,
1043 pk_prefix,
1044 range_bounds,
1045 ordered,
1046 chunk_size,
1047 prefetch_options,
1048 )
1049 .await?;
1050
1051 Ok(iter.map(|item| {
1052 let (columns, row_count) = item?;
1053 Ok(DataChunk::new(columns, row_count))
1054 }))
1055 }
1056}
1057
1058struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1060 iter: SI,
1062
1063 mapping: Arc<ColumnMapping>,
1064
1065 epoch_idx: Option<usize>,
1067
1068 row_deserializer: Arc<SD>,
1069
1070 pk_serializer: Option<Arc<OrderedRowSerde>>,
1072
1073 output_indices: Vec<usize>,
1074
1075 key_output_indices: Option<Vec<usize>>,
1077
1078 value_output_indices: Vec<usize>,
1080
1081 output_row_in_key_indices: Vec<usize>,
1083}
1084
1085impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1086 #[allow(clippy::too_many_arguments)]
1088 async fn new<S>(
1089 store: &S,
1090 mapping: Arc<ColumnMapping>,
1091 epoch_idx: Option<usize>,
1092 pk_serializer: Option<Arc<OrderedRowSerde>>,
1093 output_indices: Vec<usize>,
1094 key_output_indices: Option<Vec<usize>>,
1095 value_output_indices: Vec<usize>,
1096 output_row_in_key_indices: Vec<usize>,
1097 row_deserializer: Arc<SD>,
1098 table_key_range: TableKeyRange,
1099 read_options: ReadOptions,
1100 ) -> StorageResult<Self>
1101 where
1102 S: StateStoreRead<Iter = SI>,
1103 {
1104 let iter = store.iter(table_key_range, read_options).await?;
1105 let iter = Self {
1106 iter,
1107 mapping,
1108 epoch_idx,
1109 row_deserializer,
1110 pk_serializer,
1111 output_indices,
1112 key_output_indices,
1113 value_output_indices,
1114 output_row_in_key_indices,
1115 };
1116 Ok(iter)
1117 }
1118
1119 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1121 async fn into_stream<K: CopyFromSlice>(mut self) {
1122 while let Some((k, v)) = self
1123 .iter
1124 .try_next()
1125 .instrument_await("storage_table_iter_next".verbose())
1126 .await?
1127 {
1128 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1129 let row = self.row_deserializer.deserialize(value)?;
1130 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1131 let row = match &self.key_output_indices {
1132 Some(key_output_indices) => {
1133 let result_row_in_key = match self.pk_serializer.clone() {
1134 Some(pk_serializer) => {
1135 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1136
1137 pk.project(&self.output_row_in_key_indices).into_owned_row()
1138 }
1139 None => OwnedRow::empty(),
1140 };
1141
1142 let mut result_row_vec = vec![];
1143 for idx in &self.output_indices {
1144 if let Some(epoch_idx) = self.epoch_idx
1145 && *idx == epoch_idx
1146 {
1147 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1148 result_row_vec
1149 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1150 } else if self.value_output_indices.contains(idx) {
1151 let item_position_in_value_indices = &self
1152 .value_output_indices
1153 .iter()
1154 .position(|p| idx == p)
1155 .unwrap();
1156 result_row_vec.push(
1157 result_row_in_value
1158 .datum_at(*item_position_in_value_indices)
1159 .to_owned_datum(),
1160 );
1161 } else {
1162 let item_position_in_pk_indices =
1163 key_output_indices.iter().position(|p| idx == p).unwrap();
1164 result_row_vec.push(
1165 result_row_in_key
1166 .datum_at(item_position_in_pk_indices)
1167 .to_owned_datum(),
1168 );
1169 }
1170 }
1171 OwnedRow::new(result_row_vec)
1172 }
1173 None => match &self.epoch_idx {
1174 Some(epoch_idx) => {
1175 let mut result_row_vec = vec![];
1176 for idx in &self.output_indices {
1177 if idx == epoch_idx {
1178 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1179 result_row_vec
1180 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1181 } else {
1182 let item_position_in_value_indices = &self
1183 .value_output_indices
1184 .iter()
1185 .position(|p| idx == p)
1186 .unwrap();
1187 result_row_vec.push(
1188 result_row_in_value
1189 .datum_at(*item_position_in_value_indices)
1190 .to_owned_datum(),
1191 );
1192 }
1193 }
1194 OwnedRow::new(result_row_vec)
1195 }
1196 None => result_row_in_value.into_owned_row(),
1197 },
1198 };
1199 yield (K::copy_from_slice(table_key.as_ref()), row);
1200 }
1201 }
1202}
1203
1204struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1206 iter: S::ChangeLogIter,
1208
1209 mapping: Arc<ColumnMapping>,
1210
1211 row_deserializer: Arc<SD>,
1212}
1213
1214impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1215 #[allow(clippy::too_many_arguments)]
1217 async fn new(
1218 store: &S,
1219 mapping: Arc<ColumnMapping>,
1220 row_deserializer: Arc<SD>,
1221 table_key_range: TableKeyRange,
1222 read_options: ReadLogOptions,
1223 start_epoch: u64,
1224 end_epoch: HummockReadEpoch,
1225 ) -> StorageResult<Self> {
1226 store
1227 .try_wait_epoch(
1228 end_epoch,
1229 TryWaitEpochOptions {
1230 table_id: read_options.table_id,
1231 },
1232 )
1233 .await?;
1234 let iter = store
1235 .iter_log(
1236 (start_epoch, end_epoch.get_epoch()),
1237 table_key_range,
1238 read_options,
1239 )
1240 .await?;
1241 let iter = Self {
1242 iter,
1243 mapping,
1244 row_deserializer,
1245 };
1246 Ok(iter)
1247 }
1248
1249 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1251 self.iter.into_stream(move |(table_key, value)| {
1252 value
1253 .try_map(|value| {
1254 let full_row = self.row_deserializer.deserialize(value)?;
1255 let row = self
1256 .mapping
1257 .project(OwnedRow::new(full_row))
1258 .into_owned_row();
1259 Ok(row)
1260 })
1261 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1262 })
1263 }
1264}