1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::Hint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45
46use crate::StateStore;
47use crate::error::{StorageError, StorageResult};
48use crate::hummock::CachePolicy;
49use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
50use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
51use crate::row_serde::{ColumnMapping, find_columns_by_ids};
52use crate::store::{
53 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
54 StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
55};
56use crate::table::merge_sort::NodePeek;
57use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
58
59#[derive(Clone)]
62pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
63 table_id: TableId,
65
66 store: S,
68
69 schema: Schema,
72
73 pk_serializer: OrderedRowSerde,
75
76 output_indices: Vec<usize>,
77
78 key_output_indices: Option<Vec<usize>>,
80
81 value_output_indices: Vec<usize>,
83
84 output_row_in_key_indices: Vec<usize>,
86
87 mapping: Arc<ColumnMapping>,
89
90 epoch_idx: Option<usize>,
92
93 row_serde: Arc<SD>,
96
97 pk_indices: Vec<usize>,
101
102 distribution: TableDistribution,
103
104 table_option: TableOption,
106
107 read_prefix_len_hint: usize,
108}
109
110pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
113
114impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("BatchTableInner").finish_non_exhaustive()
117 }
118}
119
120impl<S: StateStore> BatchTableInner<S, EitherSerde> {
122 pub fn new_partial(
133 store: S,
134 output_column_ids: Vec<ColumnId>,
135 vnodes: Option<Arc<Bitmap>>,
136 table_desc: &StorageTableDesc,
137 ) -> Self {
138 let table_id = TableId {
139 table_id: table_desc.table_id,
140 };
141 let column_descs = table_desc
142 .columns
143 .iter()
144 .map(ColumnDesc::from)
145 .collect_vec();
146 let order_types: Vec<OrderType> = table_desc
147 .pk
148 .iter()
149 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150 .collect();
151
152 let pk_indices = table_desc
153 .pk
154 .iter()
155 .map(|k| k.column_index as usize)
156 .collect_vec();
157
158 let table_option = TableOption {
159 retention_seconds: table_desc.retention_seconds,
160 };
161 let value_indices = table_desc
162 .get_value_indices()
163 .iter()
164 .map(|&k| k as usize)
165 .collect_vec();
166 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167 let versioned = table_desc.versioned;
168 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170 Self::new_inner(
171 store,
172 table_id,
173 column_descs,
174 output_column_ids,
175 order_types,
176 pk_indices,
177 distribution,
178 table_option,
179 value_indices,
180 prefix_hint_len,
181 versioned,
182 )
183 }
184
185 pub fn for_test_with_partial_columns(
186 store: S,
187 table_id: TableId,
188 columns: Vec<ColumnDesc>,
189 output_column_ids: Vec<ColumnId>,
190 order_types: Vec<OrderType>,
191 pk_indices: Vec<usize>,
192 value_indices: Vec<usize>,
193 ) -> Self {
194 Self::new_inner(
195 store,
196 table_id,
197 columns,
198 output_column_ids,
199 order_types,
200 pk_indices,
201 TableDistribution::singleton(),
202 Default::default(),
203 value_indices,
204 0,
205 false,
206 )
207 }
208
209 pub fn for_test(
210 store: S,
211 table_id: TableId,
212 columns: Vec<ColumnDesc>,
213 order_types: Vec<OrderType>,
214 pk_indices: Vec<usize>,
215 value_indices: Vec<usize>,
216 ) -> Self {
217 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218 Self::for_test_with_partial_columns(
219 store,
220 table_id,
221 columns,
222 output_column_ids,
223 order_types,
224 pk_indices,
225 value_indices,
226 )
227 }
228
229 #[allow(clippy::too_many_arguments)]
230 fn new_inner(
231 store: S,
232 table_id: TableId,
233 table_columns: Vec<ColumnDesc>,
234 output_column_ids: Vec<ColumnId>,
235 order_types: Vec<OrderType>,
236 pk_indices: Vec<usize>,
237 distribution: TableDistribution,
238 table_option: TableOption,
239 value_indices: Vec<usize>,
240 read_prefix_len_hint: usize,
241 versioned: bool,
242 ) -> Self {
243 assert_eq!(order_types.len(), pk_indices.len());
244
245 let (output_columns, output_indices) =
246 find_columns_by_ids(&table_columns, &output_column_ids);
247
248 let mut value_output_indices = vec![];
249 let mut key_output_indices = vec![];
250 let mut epoch_idx = None;
252
253 for idx in &output_indices {
254 if value_indices.contains(idx) {
255 value_output_indices.push(*idx);
256 } else if pk_indices.contains(idx) {
257 key_output_indices.push(*idx);
258 } else {
259 assert!(epoch_idx.is_none());
260 epoch_idx = Some(*idx);
261 }
262 }
263
264 let output_row_in_key_indices = key_output_indices
265 .iter()
266 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267 .collect_vec();
268 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270 let pk_data_types = pk_indices
271 .iter()
272 .map(|i| table_columns[*i].data_type.clone())
273 .collect();
274 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275 let (row_serde, mapping) = {
276 if versioned {
277 let value_output_indices_dedup = value_output_indices
278 .iter()
279 .unique()
280 .copied()
281 .collect::<Vec<_>>();
282 let output_row_in_value_output_indices_dedup = value_output_indices
283 .iter()
284 .map(|&di| {
285 value_output_indices_dedup
286 .iter()
287 .position(|&pi| di == pi)
288 .unwrap()
289 })
290 .collect_vec();
291 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292 let serde =
293 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294 (serde.into(), mapping)
295 } else {
296 let output_row_in_value_indices = value_output_indices
297 .iter()
298 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299 .collect_vec();
300 let mapping = ColumnMapping::new(output_row_in_value_indices);
301 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302 (serde.into(), mapping)
303 }
304 };
305
306 let key_output_indices = match key_output_indices.is_empty() {
307 true => None,
308 false => Some(key_output_indices),
309 };
310 Self {
311 table_id,
312 store,
313 schema,
314 pk_serializer,
315 output_indices,
316 key_output_indices,
317 value_output_indices,
318 output_row_in_key_indices,
319 mapping: Arc::new(mapping),
320 epoch_idx,
321 row_serde: Arc::new(row_serde),
322 pk_indices,
323 distribution,
324 table_option,
325 read_prefix_len_hint,
326 }
327 }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331 pub fn pk_serializer(&self) -> &OrderedRowSerde {
332 &self.pk_serializer
333 }
334
335 pub fn schema(&self) -> &Schema {
336 &self.schema
337 }
338
339 pub fn pk_indices(&self) -> &[usize] {
340 &self.pk_indices
341 }
342
343 pub fn output_indices(&self) -> &[usize] {
344 &self.output_indices
345 }
346
347 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351 self.pk_indices
352 .iter()
353 .map(|&i| self.output_indices.iter().position(|&j| i == j))
354 .collect()
355 }
356
357 pub fn table_id(&self) -> TableId {
358 self.table_id
359 }
360
361 pub fn vnodes(&self) -> &Arc<Bitmap> {
362 self.distribution.vnodes()
363 }
364}
365impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367 pub async fn get_row(
369 &self,
370 pk: impl Row,
371 wait_epoch: HummockReadEpoch,
372 ) -> StorageResult<Option<OwnedRow>> {
373 self.store
374 .try_wait_epoch(
375 wait_epoch,
376 TryWaitEpochOptions {
377 table_id: self.table_id,
378 },
379 )
380 .await?;
381 let serialized_pk = serialize_pk_with_vnode(
382 &pk,
383 &self.pk_serializer,
384 self.distribution.compute_vnode_by_pk(&pk),
385 );
386 assert!(pk.len() <= self.pk_indices.len());
387
388 let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
389 {
390 Some(serialized_pk.slice(VirtualNode::SIZE..))
391 } else {
392 None
393 };
394
395 let read_options = ReadOptions {
396 prefix_hint,
397 retention_seconds: self.table_option.retention_seconds,
398 cache_policy: CachePolicy::Fill(Hint::Normal),
399 ..Default::default()
400 };
401 let read_snapshot = self
402 .store
403 .new_read_snapshot(
404 wait_epoch,
405 NewReadSnapshotOptions {
406 table_id: self.table_id,
407 },
408 )
409 .await?;
410 let row_serde = self.row_serde.clone();
412 match read_snapshot
413 .on_key_value(serialized_pk, read_options, move |key, value| {
414 let row = row_serde.deserialize(value)?;
415 Ok((key.epoch_with_gap.pure_epoch(), row))
416 })
417 .await?
418 {
419 Some((epoch, row)) => {
420 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
421
422 match &self.key_output_indices {
423 Some(key_output_indices) => {
424 let result_row_in_key =
425 pk.project(&self.output_row_in_key_indices).into_owned_row();
426 let mut result_row_vec = vec![];
427 for idx in &self.output_indices {
428 if let Some(epoch_idx) = self.epoch_idx
429 && *idx == epoch_idx
430 {
431 let epoch = Epoch::from(epoch);
432 result_row_vec
433 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
434 } else if self.value_output_indices.contains(idx) {
435 let item_position_in_value_indices = &self
436 .value_output_indices
437 .iter()
438 .position(|p| idx == p)
439 .unwrap();
440 result_row_vec.push(
441 result_row_in_value
442 .datum_at(*item_position_in_value_indices)
443 .to_owned_datum(),
444 );
445 } else {
446 let item_position_in_pk_indices =
447 key_output_indices.iter().position(|p| idx == p).unwrap();
448 result_row_vec.push(
449 result_row_in_key
450 .datum_at(item_position_in_pk_indices)
451 .to_owned_datum(),
452 );
453 }
454 }
455 let result_row = OwnedRow::new(result_row_vec);
456 Ok(Some(result_row))
457 }
458 None => match &self.epoch_idx {
459 Some(epoch_idx) => {
460 let mut result_row_vec = vec![];
461 for idx in &self.output_indices {
462 if idx == epoch_idx {
463 let epoch = Epoch::from(epoch);
464 result_row_vec.push(risingwave_common::types::Datum::from(
465 epoch.as_scalar(),
466 ));
467 } else {
468 let item_position_in_value_indices = &self
469 .value_output_indices
470 .iter()
471 .position(|p| idx == p)
472 .unwrap();
473 result_row_vec.push(
474 result_row_in_value
475 .datum_at(*item_position_in_value_indices)
476 .to_owned_datum(),
477 );
478 }
479 }
480 let result_row = OwnedRow::new(result_row_vec);
481 Ok(Some(result_row))
482 }
483 None => Ok(Some(result_row_in_value.into_owned_row())),
484 },
485 }
486 }
487 _ => Ok(None),
488 }
489 }
490
491 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
493 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
494 self.distribution.update_vnode_bitmap(new_vnodes)
495 }
496}
497
498impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
501 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
502 self.next().await.transpose()
503 }
504}
505
506mod merge_vnode_stream {
507
508 use bytes::Bytes;
509 use futures::{Stream, StreamExt, TryStreamExt};
510 use risingwave_hummock_sdk::key::TableKey;
511
512 use crate::error::StorageResult;
513 use crate::table::KeyedRow;
514 use crate::table::merge_sort::{NodePeek, merge_sort};
515
516 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
517 Single(RowSt),
518 Unordered(Vec<RowSt>),
519 Ordered(Vec<KeyedRowSt>),
520 }
521
522 pub(super) type MergedVnodeStream<
523 R: Send,
524 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
525 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
526 >
527 where
528 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
529 = impl Stream<Item = StorageResult<R>> + Send;
530
531 pub(super) type SortKeyType = Bytes; #[define_opaque(MergedVnodeStream)]
534 pub(super) fn merge_stream<
535 R: Send,
536 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
537 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
538 >(
539 stream: VnodeStreamType<RowSt, KeyedRowSt>,
540 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
541 where
542 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
543 {
544 #[auto_enums::auto_enum(futures03::Stream)]
545 match stream {
546 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
547 VnodeStreamType::Unordered(streams) => futures::stream::iter(
548 streams
549 .into_iter()
550 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
551 )
552 .flatten_unordered(1024),
553 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
554 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
555 vnode_prefixed_key: TableKey(key),
556 row,
557 }))
558 }))
559 .map_ok(|keyed_row| keyed_row.row),
560 }
561 }
562}
563
564use merge_vnode_stream::*;
565
566async fn build_vnode_stream<
567 R: Send,
568 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
569 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
570 RowStFut: Future<Output = StorageResult<RowSt>>,
571 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
572>(
573 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
574 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
575 vnodes: &[VirtualNode],
576 ordered: bool,
577) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
578where
579 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
580{
581 let stream = match vnodes {
582 [] => unreachable!(),
583 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
584 vnodes if !ordered => VnodeStreamType::Unordered(
586 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
587 ),
588 vnodes => VnodeStreamType::Ordered(
590 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
591 ),
592 };
593 Ok(merge_stream(stream))
594}
595
596impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
598 async fn iter_with_encoded_key_range(
601 &self,
602 prefix_hint: Option<Bytes>,
603 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
604 wait_epoch: HummockReadEpoch,
605 vnode_hint: Option<VirtualNode>,
606 ordered: bool,
607 prefetch_options: PrefetchOptions,
608 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
609 {
610 let vnodes = match vnode_hint {
611 Some(vnode) => {
613 assert!(
614 self.distribution.vnodes().is_set(vnode.to_index()),
615 "vnode unset: {:?}, distribution: {:?}",
616 vnode,
617 self.distribution
618 );
619 vec![vnode]
620 }
621 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
623 };
624
625 let read_snapshot = self
626 .store
627 .new_read_snapshot(
628 wait_epoch,
629 NewReadSnapshotOptions {
630 table_id: self.table_id,
631 },
632 )
633 .await?;
634
635 build_vnode_stream(
636 |vnode| {
637 self.iter_vnode_with_encoded_key_range(
638 &read_snapshot,
639 prefix_hint.clone(),
640 (start_bound.as_ref(), end_bound.as_ref()),
641 vnode,
642 prefetch_options,
643 )
644 },
645 |vnode| {
646 self.iter_vnode_with_encoded_key_range(
647 &read_snapshot,
648 prefix_hint.clone(),
649 (start_bound.as_ref(), end_bound.as_ref()),
650 vnode,
651 prefetch_options,
652 )
653 },
654 &vnodes,
655 ordered,
656 )
657 .await
658 }
659
660 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
661 &self,
662 read_snapshot: &S::ReadSnapshot,
663 prefix_hint: Option<Bytes>,
664 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
665 vnode: VirtualNode,
666 prefetch_options: PrefetchOptions,
667 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
668 {
669 let cache_policy = match &encoded_key_range {
670 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
673 _ => CachePolicy::Fill(Hint::Normal),
674 };
675
676 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
677
678 {
679 let prefix_hint = prefix_hint.clone();
680 {
681 let read_options = ReadOptions {
682 prefix_hint,
683 retention_seconds: self.table_option.retention_seconds,
684 prefetch_options,
685 cache_policy,
686 };
687 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
688 true => None,
689 false => Some(Arc::new(self.pk_serializer.clone())),
690 };
691 let iter = BatchTableInnerIterInner::new(
692 read_snapshot,
693 self.mapping.clone(),
694 self.epoch_idx,
695 pk_serializer,
696 self.output_indices.clone(),
697 self.key_output_indices.clone(),
698 self.value_output_indices.clone(),
699 self.output_row_in_key_indices.clone(),
700 self.row_serde.clone(),
701 table_key_range,
702 read_options,
703 )
704 .await?
705 .into_stream::<K>();
706 Ok(iter)
707 }
708 }
709 }
710
711 fn serialize_pk_bound(
713 &self,
714 pk_prefix: impl Row,
715 range_bound: Bound<&OwnedRow>,
716 is_start_bound: bool,
717 ) -> Bound<Bytes> {
718 match range_bound {
719 Included(k) => {
720 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
721 let key = pk_prefix.chain(k);
722 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
723 if is_start_bound {
724 Included(serialized_key)
725 } else {
726 end_bound_of_prefix(&serialized_key)
729 }
730 }
731 Excluded(k) => {
732 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
733 let key = pk_prefix.chain(k);
734 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
735 if is_start_bound {
736 let next_serialized_key = next_key(&serialized_key);
741 assert!(!next_serialized_key.is_empty());
742 Included(Bytes::from(next_serialized_key))
743 } else {
744 Excluded(serialized_key)
745 }
746 }
747 Unbounded => {
748 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
749 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
750 if pk_prefix.is_empty() {
751 Unbounded
752 } else if is_start_bound {
753 Included(serialized_pk_prefix)
754 } else {
755 end_bound_of_prefix(&serialized_pk_prefix)
756 }
757 }
758 }
759 }
760
761 async fn iter_with_pk_bounds(
763 &self,
764 epoch: HummockReadEpoch,
765 pk_prefix: impl Row,
766 range_bounds: impl RangeBounds<OwnedRow>,
767 ordered: bool,
768 prefetch_options: PrefetchOptions,
769 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
770 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
771 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
772 assert!(pk_prefix.len() <= self.pk_indices.len());
773 let pk_prefix_indices = (0..pk_prefix.len())
774 .map(|index| self.pk_indices[index])
775 .collect_vec();
776
777 let prefix_hint = if self.read_prefix_len_hint != 0
778 && self.read_prefix_len_hint <= pk_prefix.len()
779 {
780 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
781 start_key
782 } else {
783 unreachable!()
784 };
785 let prefix_len = self
786 .pk_serializer
787 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
788 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
789 } else {
790 trace!(
791 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
792 self.table_id, pk_prefix, pk_prefix_indices
793 );
794 None
795 };
796
797 trace!(
798 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
799 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
800 );
801
802 self.iter_with_encoded_key_range(
803 prefix_hint,
804 (start_key, end_key),
805 epoch,
806 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
807 ordered,
808 prefetch_options,
809 )
810 .await
811 }
812
813 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
815 async fn convert_row_stream_to_array_vec_stream(
816 iter: impl Stream<Item = StorageResult<OwnedRow>>,
817 schema: Schema,
818 chunk_size: usize,
819 ) {
820 use futures::{TryStreamExt, pin_mut};
821 use risingwave_common::util::iter_util::ZipEqFast;
822
823 pin_mut!(iter);
824
825 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
826 let mut row_count = 0;
827
828 while let Some(row) = iter.try_next().await? {
829 row_count += 1;
830 let builders_ref =
832 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
833 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
834 builder.append(datum);
835 }
836 if row_count == chunk_size {
837 let columns: Vec<_> = builders
838 .take()
839 .unwrap()
840 .into_iter()
841 .map(|builder| builder.finish().into())
842 .collect();
843 yield (columns, row_count);
844 assert!(builders.is_none());
845 row_count = 0;
846 }
847 }
848
849 if let Some(builders) = builders {
850 assert_gt!(row_count, 0);
851 let columns: Vec<_> = builders
853 .into_iter()
854 .map(|builder| builder.finish().into())
855 .collect();
856 yield (columns, row_count);
857 }
858 }
859
860 async fn chunk_iter_with_pk_bounds(
863 &self,
864 epoch: HummockReadEpoch,
865 pk_prefix: impl Row,
866 range_bounds: impl RangeBounds<OwnedRow>,
867 ordered: bool,
868 chunk_size: usize,
869 prefetch_options: PrefetchOptions,
870 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
871 let iter = self
872 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
873 .await?;
874
875 Ok(Self::convert_row_stream_to_array_vec_stream(
876 iter,
877 self.schema.clone(),
878 chunk_size,
879 ))
880 }
881
882 pub async fn batch_iter_with_pk_bounds(
885 &self,
886 epoch: HummockReadEpoch,
887 pk_prefix: impl Row,
888 range_bounds: impl RangeBounds<OwnedRow>,
889 ordered: bool,
890 prefetch_options: PrefetchOptions,
891 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
892 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
893 .await
894 }
895
896 pub async fn batch_iter(
898 &self,
899 epoch: HummockReadEpoch,
900 ordered: bool,
901 prefetch_options: PrefetchOptions,
902 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
903 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
904 .await
905 }
906
907 pub async fn batch_iter_vnode(
908 &self,
909 epoch: HummockReadEpoch,
910 start_pk: Option<&OwnedRow>,
911 vnode: VirtualNode,
912 prefetch_options: PrefetchOptions,
913 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
914 {
915 let start_bound = if let Some(start_pk) = start_pk {
916 let mut bytes = BytesMut::new();
917 self.pk_serializer.serialize(start_pk, &mut bytes);
918 let bytes = bytes.freeze();
919 Included(bytes)
920 } else {
921 Unbounded
922 };
923 let read_snapshot = self
924 .store
925 .new_read_snapshot(
926 epoch,
927 NewReadSnapshotOptions {
928 table_id: self.table_id,
929 },
930 )
931 .await?;
932 Ok(self
933 .iter_vnode_with_encoded_key_range::<()>(
934 &read_snapshot,
935 None,
936 (start_bound.as_ref(), Unbounded),
937 vnode,
938 prefetch_options,
939 )
940 .await?
941 .map_ok(|(_, row)| row))
942 }
943
944 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
945 self.store
946 .next_epoch(
947 epoch,
948 NextEpochOptions {
949 table_id: self.table_id,
950 },
951 )
952 .await
953 }
954
955 pub async fn batch_iter_vnode_log(
956 &self,
957 start_epoch: u64,
958 end_epoch: HummockReadEpoch,
959 start_pk: Option<&OwnedRow>,
960 vnode: VirtualNode,
961 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
962 {
963 let start_bound = if let Some(start_pk) = start_pk {
964 let mut bytes = BytesMut::new();
965 self.pk_serializer.serialize(start_pk, &mut bytes);
966 let bytes = bytes.freeze();
967 Included(bytes)
968 } else {
969 Unbounded
970 };
971 let stream = self
972 .batch_iter_log_inner::<()>(
973 start_epoch,
974 end_epoch,
975 (start_bound.as_ref(), Unbounded),
976 vnode,
977 )
978 .await?;
979 Ok(stream.map_ok(|(_, row)| row))
980 }
981
982 pub async fn batch_iter_log_with_pk_bounds(
983 &self,
984 start_epoch: u64,
985 end_epoch: HummockReadEpoch,
986 ordered: bool,
987 range_bounds: impl RangeBounds<OwnedRow>,
988 pk_prefix: impl Row,
989 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
990 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
991 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
992 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
993 build_vnode_stream(
994 |vnode| {
995 self.batch_iter_log_inner(
996 start_epoch,
997 end_epoch,
998 (start_key.as_ref(), end_key.as_ref()),
999 vnode,
1000 )
1001 },
1002 |vnode| {
1003 self.batch_iter_log_inner(
1004 start_epoch,
1005 end_epoch,
1006 (start_key.as_ref(), end_key.as_ref()),
1007 vnode,
1008 )
1009 },
1010 &vnodes,
1011 ordered,
1012 )
1013 .await
1014 }
1015
1016 async fn batch_iter_log_inner<K: CopyFromSlice>(
1017 &self,
1018 start_epoch: u64,
1019 end_epoch: HummockReadEpoch,
1020 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1021 vnode: VirtualNode,
1022 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1023 {
1024 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1025 let read_options = ReadLogOptions {
1026 table_id: self.table_id,
1027 };
1028 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1029 &self.store,
1030 self.mapping.clone(),
1031 self.row_serde.clone(),
1032 table_key_range,
1033 read_options,
1034 start_epoch,
1035 end_epoch,
1036 )
1037 .await?
1038 .into_stream::<K>();
1039
1040 Ok(iter)
1041 }
1042
1043 pub async fn batch_chunk_iter_with_pk_bounds(
1046 &self,
1047 epoch: HummockReadEpoch,
1048 pk_prefix: impl Row,
1049 range_bounds: impl RangeBounds<OwnedRow>,
1050 ordered: bool,
1051 chunk_size: usize,
1052 prefetch_options: PrefetchOptions,
1053 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1054 let iter = self
1055 .chunk_iter_with_pk_bounds(
1056 epoch,
1057 pk_prefix,
1058 range_bounds,
1059 ordered,
1060 chunk_size,
1061 prefetch_options,
1062 )
1063 .await?;
1064
1065 Ok(iter.map(|item| {
1066 let (columns, row_count) = item?;
1067 Ok(DataChunk::new(columns, row_count))
1068 }))
1069 }
1070}
1071
1072struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1074 iter: SI,
1076
1077 mapping: Arc<ColumnMapping>,
1078
1079 epoch_idx: Option<usize>,
1081
1082 row_deserializer: Arc<SD>,
1083
1084 pk_serializer: Option<Arc<OrderedRowSerde>>,
1086
1087 output_indices: Vec<usize>,
1088
1089 key_output_indices: Option<Vec<usize>>,
1091
1092 value_output_indices: Vec<usize>,
1094
1095 output_row_in_key_indices: Vec<usize>,
1097}
1098
1099impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1100 #[allow(clippy::too_many_arguments)]
1102 async fn new<S>(
1103 store: &S,
1104 mapping: Arc<ColumnMapping>,
1105 epoch_idx: Option<usize>,
1106 pk_serializer: Option<Arc<OrderedRowSerde>>,
1107 output_indices: Vec<usize>,
1108 key_output_indices: Option<Vec<usize>>,
1109 value_output_indices: Vec<usize>,
1110 output_row_in_key_indices: Vec<usize>,
1111 row_deserializer: Arc<SD>,
1112 table_key_range: TableKeyRange,
1113 read_options: ReadOptions,
1114 ) -> StorageResult<Self>
1115 where
1116 S: StateStoreRead<Iter = SI>,
1117 {
1118 let iter = store.iter(table_key_range, read_options).await?;
1119 let iter = Self {
1120 iter,
1121 mapping,
1122 epoch_idx,
1123 row_deserializer,
1124 pk_serializer,
1125 output_indices,
1126 key_output_indices,
1127 value_output_indices,
1128 output_row_in_key_indices,
1129 };
1130 Ok(iter)
1131 }
1132
1133 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1135 async fn into_stream<K: CopyFromSlice>(mut self) {
1136 while let Some((k, v)) = self
1137 .iter
1138 .try_next()
1139 .instrument_await("storage_table_iter_next".verbose())
1140 .await?
1141 {
1142 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1143 let row = self.row_deserializer.deserialize(value)?;
1144 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1145 let row = match &self.key_output_indices {
1146 Some(key_output_indices) => {
1147 let result_row_in_key = match self.pk_serializer.clone() {
1148 Some(pk_serializer) => {
1149 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1150
1151 pk.project(&self.output_row_in_key_indices).into_owned_row()
1152 }
1153 None => OwnedRow::empty(),
1154 };
1155
1156 let mut result_row_vec = vec![];
1157 for idx in &self.output_indices {
1158 if let Some(epoch_idx) = self.epoch_idx
1159 && *idx == epoch_idx
1160 {
1161 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1162 result_row_vec
1163 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1164 } else if self.value_output_indices.contains(idx) {
1165 let item_position_in_value_indices = &self
1166 .value_output_indices
1167 .iter()
1168 .position(|p| idx == p)
1169 .unwrap();
1170 result_row_vec.push(
1171 result_row_in_value
1172 .datum_at(*item_position_in_value_indices)
1173 .to_owned_datum(),
1174 );
1175 } else {
1176 let item_position_in_pk_indices =
1177 key_output_indices.iter().position(|p| idx == p).unwrap();
1178 result_row_vec.push(
1179 result_row_in_key
1180 .datum_at(item_position_in_pk_indices)
1181 .to_owned_datum(),
1182 );
1183 }
1184 }
1185 OwnedRow::new(result_row_vec)
1186 }
1187 None => match &self.epoch_idx {
1188 Some(epoch_idx) => {
1189 let mut result_row_vec = vec![];
1190 for idx in &self.output_indices {
1191 if idx == epoch_idx {
1192 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1193 result_row_vec
1194 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1195 } else {
1196 let item_position_in_value_indices = &self
1197 .value_output_indices
1198 .iter()
1199 .position(|p| idx == p)
1200 .unwrap();
1201 result_row_vec.push(
1202 result_row_in_value
1203 .datum_at(*item_position_in_value_indices)
1204 .to_owned_datum(),
1205 );
1206 }
1207 }
1208 OwnedRow::new(result_row_vec)
1209 }
1210 None => result_row_in_value.into_owned_row(),
1211 },
1212 };
1213 yield (K::copy_from_slice(table_key.as_ref()), row);
1214 }
1215 }
1216}
1217
1218struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1220 iter: S::ChangeLogIter,
1222
1223 mapping: Arc<ColumnMapping>,
1224
1225 row_deserializer: Arc<SD>,
1226}
1227
1228impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1229 #[allow(clippy::too_many_arguments)]
1231 async fn new(
1232 store: &S,
1233 mapping: Arc<ColumnMapping>,
1234 row_deserializer: Arc<SD>,
1235 table_key_range: TableKeyRange,
1236 read_options: ReadLogOptions,
1237 start_epoch: u64,
1238 end_epoch: HummockReadEpoch,
1239 ) -> StorageResult<Self> {
1240 store
1241 .try_wait_epoch(
1242 end_epoch,
1243 TryWaitEpochOptions {
1244 table_id: read_options.table_id,
1245 },
1246 )
1247 .await?;
1248 let iter = store
1249 .iter_log(
1250 (start_epoch, end_epoch.get_epoch()),
1251 table_key_range,
1252 read_options,
1253 )
1254 .await?;
1255 let iter = Self {
1256 iter,
1257 mapping,
1258 row_deserializer,
1259 };
1260 Ok(iter)
1261 }
1262
1263 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1265 self.iter.into_stream(move |(table_key, value)| {
1266 value
1267 .try_map(|value| {
1268 let full_row = self.row_deserializer.deserialize(value)?;
1269 let row = self
1270 .mapping
1271 .project(OwnedRow::new(full_row))
1272 .into_owned_row();
1273 Ok(row)
1274 })
1275 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1276 })
1277 }
1278}