1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19
20use await_tree::{InstrumentAwait, SpanExt};
21use bytes::{Bytes, BytesMut};
22use foyer::Hint;
23use futures::future::try_join_all;
24use futures::{Stream, StreamExt, TryStreamExt};
25use futures_async_stream::try_stream;
26use itertools::Itertools;
27use more_asserts::assert_gt;
28use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
29use risingwave_common::bitmap::Bitmap;
30use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
31use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
32use risingwave_common::row::{self, OwnedRow, Row, RowExt};
33use risingwave_common::types::ToOwnedDatum;
34use risingwave_common::util::epoch::Epoch;
35use risingwave_common::util::row_serde::*;
36use risingwave_common::util::sort_util::OrderType;
37use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
38use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
39use risingwave_hummock_sdk::HummockReadEpoch;
40use risingwave_hummock_sdk::key::{
41 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
42};
43use risingwave_pb::plan_common::StorageTableDesc;
44use tracing::trace;
45
46use crate::StateStore;
47use crate::error::{StorageError, StorageResult};
48use crate::hummock::CachePolicy;
49use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
50use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
51use crate::row_serde::{ColumnMapping, find_columns_by_ids};
52use crate::store::{
53 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
54 StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
55};
56use crate::table::merge_sort::NodePeek;
57use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
58
59#[derive(Clone)]
62pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
63 table_id: TableId,
65
66 store: S,
68
69 schema: Schema,
72
73 pk_serializer: OrderedRowSerde,
75
76 output_indices: Vec<usize>,
77
78 key_output_indices: Option<Vec<usize>>,
80
81 value_output_indices: Vec<usize>,
83
84 output_row_in_key_indices: Vec<usize>,
86
87 mapping: Arc<ColumnMapping>,
89
90 epoch_idx: Option<usize>,
92
93 row_serde: Arc<SD>,
96
97 pk_indices: Vec<usize>,
101
102 distribution: TableDistribution,
103
104 table_option: TableOption,
106
107 read_prefix_len_hint: usize,
108}
109
110pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
113
114impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("BatchTableInner").finish_non_exhaustive()
117 }
118}
119
120impl<S: StateStore> BatchTableInner<S, EitherSerde> {
122 pub fn new_partial(
133 store: S,
134 output_column_ids: Vec<ColumnId>,
135 vnodes: Option<Arc<Bitmap>>,
136 table_desc: &StorageTableDesc,
137 ) -> Self {
138 let table_id = TableId {
139 table_id: table_desc.table_id,
140 };
141 let column_descs = table_desc
142 .columns
143 .iter()
144 .map(ColumnDesc::from)
145 .collect_vec();
146 let order_types: Vec<OrderType> = table_desc
147 .pk
148 .iter()
149 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
150 .collect();
151
152 let pk_indices = table_desc
153 .pk
154 .iter()
155 .map(|k| k.column_index as usize)
156 .collect_vec();
157
158 let table_option = TableOption {
159 retention_seconds: table_desc.retention_seconds,
160 };
161 let value_indices = table_desc
162 .get_value_indices()
163 .iter()
164 .map(|&k| k as usize)
165 .collect_vec();
166 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
167 let versioned = table_desc.versioned;
168 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
169
170 Self::new_inner(
171 store,
172 table_id,
173 column_descs,
174 output_column_ids,
175 order_types,
176 pk_indices,
177 distribution,
178 table_option,
179 value_indices,
180 prefix_hint_len,
181 versioned,
182 )
183 }
184
185 pub fn for_test_with_partial_columns(
186 store: S,
187 table_id: TableId,
188 columns: Vec<ColumnDesc>,
189 output_column_ids: Vec<ColumnId>,
190 order_types: Vec<OrderType>,
191 pk_indices: Vec<usize>,
192 value_indices: Vec<usize>,
193 ) -> Self {
194 Self::new_inner(
195 store,
196 table_id,
197 columns,
198 output_column_ids,
199 order_types,
200 pk_indices,
201 TableDistribution::singleton(),
202 Default::default(),
203 value_indices,
204 0,
205 false,
206 )
207 }
208
209 pub fn for_test(
210 store: S,
211 table_id: TableId,
212 columns: Vec<ColumnDesc>,
213 order_types: Vec<OrderType>,
214 pk_indices: Vec<usize>,
215 value_indices: Vec<usize>,
216 ) -> Self {
217 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
218 Self::for_test_with_partial_columns(
219 store,
220 table_id,
221 columns,
222 output_column_ids,
223 order_types,
224 pk_indices,
225 value_indices,
226 )
227 }
228
229 #[allow(clippy::too_many_arguments)]
230 fn new_inner(
231 store: S,
232 table_id: TableId,
233 table_columns: Vec<ColumnDesc>,
234 output_column_ids: Vec<ColumnId>,
235 order_types: Vec<OrderType>,
236 pk_indices: Vec<usize>,
237 distribution: TableDistribution,
238 table_option: TableOption,
239 value_indices: Vec<usize>,
240 read_prefix_len_hint: usize,
241 versioned: bool,
242 ) -> Self {
243 assert_eq!(order_types.len(), pk_indices.len());
244
245 let (output_columns, output_indices) =
246 find_columns_by_ids(&table_columns, &output_column_ids);
247
248 let mut value_output_indices = vec![];
249 let mut key_output_indices = vec![];
250 let mut epoch_idx = None;
252
253 for idx in &output_indices {
254 if value_indices.contains(idx) {
255 value_output_indices.push(*idx);
256 } else if pk_indices.contains(idx) {
257 key_output_indices.push(*idx);
258 } else {
259 assert!(epoch_idx.is_none());
260 epoch_idx = Some(*idx);
261 }
262 }
263
264 let output_row_in_key_indices = key_output_indices
265 .iter()
266 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
267 .collect_vec();
268 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
269
270 let pk_data_types = pk_indices
271 .iter()
272 .map(|i| table_columns[*i].data_type.clone())
273 .collect();
274 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
275 let (row_serde, mapping) = {
276 if versioned {
277 let value_output_indices_dedup = value_output_indices
278 .iter()
279 .unique()
280 .copied()
281 .collect::<Vec<_>>();
282 let output_row_in_value_output_indices_dedup = value_output_indices
283 .iter()
284 .map(|&di| {
285 value_output_indices_dedup
286 .iter()
287 .position(|&pi| di == pi)
288 .unwrap()
289 })
290 .collect_vec();
291 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
292 let serde =
293 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
294 (serde.into(), mapping)
295 } else {
296 let output_row_in_value_indices = value_output_indices
297 .iter()
298 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
299 .collect_vec();
300 let mapping = ColumnMapping::new(output_row_in_value_indices);
301 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
302 (serde.into(), mapping)
303 }
304 };
305
306 let key_output_indices = match key_output_indices.is_empty() {
307 true => None,
308 false => Some(key_output_indices),
309 };
310 Self {
311 table_id,
312 store,
313 schema,
314 pk_serializer,
315 output_indices,
316 key_output_indices,
317 value_output_indices,
318 output_row_in_key_indices,
319 mapping: Arc::new(mapping),
320 epoch_idx,
321 row_serde: Arc::new(row_serde),
322 pk_indices,
323 distribution,
324 table_option,
325 read_prefix_len_hint,
326 }
327 }
328}
329
330impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
331 pub fn pk_serializer(&self) -> &OrderedRowSerde {
332 &self.pk_serializer
333 }
334
335 pub fn schema(&self) -> &Schema {
336 &self.schema
337 }
338
339 pub fn pk_indices(&self) -> &[usize] {
340 &self.pk_indices
341 }
342
343 pub fn output_indices(&self) -> &[usize] {
344 &self.output_indices
345 }
346
347 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
351 self.pk_indices
352 .iter()
353 .map(|&i| self.output_indices.iter().position(|&j| i == j))
354 .collect()
355 }
356
357 pub fn table_id(&self) -> TableId {
358 self.table_id
359 }
360
361 pub fn vnodes(&self) -> &Arc<Bitmap> {
362 self.distribution.vnodes()
363 }
364}
365impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
367 pub async fn get_row(
369 &self,
370 pk: impl Row,
371 wait_epoch: HummockReadEpoch,
372 ) -> StorageResult<Option<OwnedRow>> {
373 self.store
374 .try_wait_epoch(
375 wait_epoch,
376 TryWaitEpochOptions {
377 table_id: self.table_id,
378 },
379 )
380 .await?;
381 let serialized_pk = serialize_pk_with_vnode(
382 &pk,
383 &self.pk_serializer,
384 self.distribution.compute_vnode_by_pk(&pk),
385 );
386 assert!(pk.len() <= self.pk_indices.len());
387
388 let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
389 {
390 Some(serialized_pk.slice(VirtualNode::SIZE..))
391 } else {
392 None
393 };
394
395 let read_options = ReadOptions {
396 prefix_hint,
397 retention_seconds: self.table_option.retention_seconds,
398 cache_policy: CachePolicy::Fill(Hint::Normal),
399 ..Default::default()
400 };
401 let read_snapshot = self
402 .store
403 .new_read_snapshot(
404 wait_epoch,
405 NewReadSnapshotOptions {
406 table_id: self.table_id,
407 },
408 )
409 .await?;
410 let row_serde = self.row_serde.clone();
412 match read_snapshot
413 .on_key_value(serialized_pk, read_options, move |key, value| {
414 let row = row_serde.deserialize(value)?;
415 Ok((key.epoch_with_gap.pure_epoch(), row))
416 })
417 .await?
418 {
419 Some((epoch, row)) => {
420 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
421
422 match &self.key_output_indices {
423 Some(key_output_indices) => {
424 let result_row_in_key =
425 pk.project(&self.output_row_in_key_indices).into_owned_row();
426 let mut result_row_vec = vec![];
427 for idx in &self.output_indices {
428 if let Some(epoch_idx) = self.epoch_idx
429 && *idx == epoch_idx
430 {
431 let epoch = Epoch::from(epoch);
432 result_row_vec
433 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
434 } else if self.value_output_indices.contains(idx) {
435 let item_position_in_value_indices = &self
436 .value_output_indices
437 .iter()
438 .position(|p| idx == p)
439 .unwrap();
440 result_row_vec.push(
441 result_row_in_value
442 .datum_at(*item_position_in_value_indices)
443 .to_owned_datum(),
444 );
445 } else {
446 let item_position_in_pk_indices =
447 key_output_indices.iter().position(|p| idx == p).unwrap();
448 result_row_vec.push(
449 result_row_in_key
450 .datum_at(item_position_in_pk_indices)
451 .to_owned_datum(),
452 );
453 }
454 }
455 let result_row = OwnedRow::new(result_row_vec);
456 Ok(Some(result_row))
457 }
458 None => match &self.epoch_idx {
459 Some(epoch_idx) => {
460 let mut result_row_vec = vec![];
461 for idx in &self.output_indices {
462 if idx == epoch_idx {
463 let epoch = Epoch::from(epoch);
464 result_row_vec.push(risingwave_common::types::Datum::from(
465 epoch.as_scalar(),
466 ));
467 } else {
468 let item_position_in_value_indices = &self
469 .value_output_indices
470 .iter()
471 .position(|p| idx == p)
472 .unwrap();
473 result_row_vec.push(
474 result_row_in_value
475 .datum_at(*item_position_in_value_indices)
476 .to_owned_datum(),
477 );
478 }
479 }
480 let result_row = OwnedRow::new(result_row_vec);
481 Ok(Some(result_row))
482 }
483 None => Ok(Some(result_row_in_value.into_owned_row())),
484 },
485 }
486 }
487 _ => Ok(None),
488 }
489 }
490
491 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
493 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
494 self.distribution.update_vnode_bitmap(new_vnodes)
495 }
496}
497
498impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
501 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
502 self.next().await.transpose()
503 }
504}
505
506mod merge_vnode_stream {
507
508 use bytes::Bytes;
509 use futures::{Stream, StreamExt, TryStreamExt};
510 use risingwave_hummock_sdk::key::TableKey;
511
512 use crate::error::StorageResult;
513 use crate::table::KeyedRow;
514 use crate::table::merge_sort::{NodePeek, merge_sort};
515
516 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
517 Single(RowSt),
518 Unordered(Vec<RowSt>),
519 Ordered(Vec<KeyedRowSt>),
520 }
521
522 pub(super) type MergedVnodeStream<
523 R: Send,
524 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
525 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
526 >
527 where
528 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
529 = impl Stream<Item = StorageResult<R>> + Send;
530
531 pub(super) type SortKeyType = Bytes; pub(super) fn merge_stream<
534 R: Send,
535 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
536 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
537 >(
538 stream: VnodeStreamType<RowSt, KeyedRowSt>,
539 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
540 where
541 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
542 {
543 #[auto_enums::auto_enum(futures03::Stream)]
544 match stream {
545 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
546 VnodeStreamType::Unordered(streams) => futures::stream::iter(
547 streams
548 .into_iter()
549 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
550 )
551 .flatten_unordered(1024),
552 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
553 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
554 vnode_prefixed_key: TableKey(key),
555 row,
556 }))
557 }))
558 .map_ok(|keyed_row| keyed_row.row),
559 }
560 }
561}
562
563use merge_vnode_stream::*;
564
565async fn build_vnode_stream<
566 R: Send,
567 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
568 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
569 RowStFut: Future<Output = StorageResult<RowSt>>,
570 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
571>(
572 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
573 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
574 vnodes: &[VirtualNode],
575 ordered: bool,
576) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
577where
578 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
579{
580 let stream = match vnodes {
581 [] => unreachable!(),
582 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
583 vnodes if !ordered => VnodeStreamType::Unordered(
585 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
586 ),
587 vnodes => VnodeStreamType::Ordered(
589 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
590 ),
591 };
592 Ok(merge_stream(stream))
593}
594
595impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
597 async fn iter_with_encoded_key_range(
600 &self,
601 prefix_hint: Option<Bytes>,
602 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
603 wait_epoch: HummockReadEpoch,
604 vnode_hint: Option<VirtualNode>,
605 ordered: bool,
606 prefetch_options: PrefetchOptions,
607 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
608 {
609 let vnodes = match vnode_hint {
610 Some(vnode) => {
612 assert!(
613 self.distribution.vnodes().is_set(vnode.to_index()),
614 "vnode unset: {:?}, distribution: {:?}",
615 vnode,
616 self.distribution
617 );
618 vec![vnode]
619 }
620 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
622 };
623
624 let read_snapshot = self
625 .store
626 .new_read_snapshot(
627 wait_epoch,
628 NewReadSnapshotOptions {
629 table_id: self.table_id,
630 },
631 )
632 .await?;
633
634 build_vnode_stream(
635 |vnode| {
636 self.iter_vnode_with_encoded_key_range(
637 &read_snapshot,
638 prefix_hint.clone(),
639 (start_bound.as_ref(), end_bound.as_ref()),
640 vnode,
641 prefetch_options,
642 )
643 },
644 |vnode| {
645 self.iter_vnode_with_encoded_key_range(
646 &read_snapshot,
647 prefix_hint.clone(),
648 (start_bound.as_ref(), end_bound.as_ref()),
649 vnode,
650 prefetch_options,
651 )
652 },
653 &vnodes,
654 ordered,
655 )
656 .await
657 }
658
659 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
660 &self,
661 read_snapshot: &S::ReadSnapshot,
662 prefix_hint: Option<Bytes>,
663 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
664 vnode: VirtualNode,
665 prefetch_options: PrefetchOptions,
666 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
667 {
668 let cache_policy = match &encoded_key_range {
669 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
672 _ => CachePolicy::Fill(Hint::Normal),
673 };
674
675 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
676
677 {
678 let prefix_hint = prefix_hint.clone();
679 {
680 let read_options = ReadOptions {
681 prefix_hint,
682 retention_seconds: self.table_option.retention_seconds,
683 prefetch_options,
684 cache_policy,
685 };
686 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
687 true => None,
688 false => Some(Arc::new(self.pk_serializer.clone())),
689 };
690 let iter = BatchTableInnerIterInner::new(
691 read_snapshot,
692 self.mapping.clone(),
693 self.epoch_idx,
694 pk_serializer,
695 self.output_indices.clone(),
696 self.key_output_indices.clone(),
697 self.value_output_indices.clone(),
698 self.output_row_in_key_indices.clone(),
699 self.row_serde.clone(),
700 table_key_range,
701 read_options,
702 )
703 .await?
704 .into_stream::<K>();
705 Ok(iter)
706 }
707 }
708 }
709
710 fn serialize_pk_bound(
712 &self,
713 pk_prefix: impl Row,
714 range_bound: Bound<&OwnedRow>,
715 is_start_bound: bool,
716 ) -> Bound<Bytes> {
717 match range_bound {
718 Included(k) => {
719 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
720 let key = pk_prefix.chain(k);
721 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
722 if is_start_bound {
723 Included(serialized_key)
724 } else {
725 end_bound_of_prefix(&serialized_key)
728 }
729 }
730 Excluded(k) => {
731 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
732 let key = pk_prefix.chain(k);
733 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
734 if is_start_bound {
735 let next_serialized_key = next_key(&serialized_key);
740 assert!(!next_serialized_key.is_empty());
741 Included(Bytes::from(next_serialized_key))
742 } else {
743 Excluded(serialized_key)
744 }
745 }
746 Unbounded => {
747 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
748 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
749 if pk_prefix.is_empty() {
750 Unbounded
751 } else if is_start_bound {
752 Included(serialized_pk_prefix)
753 } else {
754 end_bound_of_prefix(&serialized_pk_prefix)
755 }
756 }
757 }
758 }
759
760 async fn iter_with_pk_bounds(
762 &self,
763 epoch: HummockReadEpoch,
764 pk_prefix: impl Row,
765 range_bounds: impl RangeBounds<OwnedRow>,
766 ordered: bool,
767 prefetch_options: PrefetchOptions,
768 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
769 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
770 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
771 assert!(pk_prefix.len() <= self.pk_indices.len());
772 let pk_prefix_indices = (0..pk_prefix.len())
773 .map(|index| self.pk_indices[index])
774 .collect_vec();
775
776 let prefix_hint = if self.read_prefix_len_hint != 0
777 && self.read_prefix_len_hint <= pk_prefix.len()
778 {
779 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
780 start_key
781 } else {
782 unreachable!()
783 };
784 let prefix_len = self
785 .pk_serializer
786 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
787 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
788 } else {
789 trace!(
790 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
791 self.table_id, pk_prefix, pk_prefix_indices
792 );
793 None
794 };
795
796 trace!(
797 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
798 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
799 );
800
801 self.iter_with_encoded_key_range(
802 prefix_hint,
803 (start_key, end_key),
804 epoch,
805 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
806 ordered,
807 prefetch_options,
808 )
809 .await
810 }
811
812 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
814 async fn convert_row_stream_to_array_vec_stream(
815 iter: impl Stream<Item = StorageResult<OwnedRow>>,
816 schema: Schema,
817 chunk_size: usize,
818 ) {
819 use futures::{TryStreamExt, pin_mut};
820 use risingwave_common::util::iter_util::ZipEqFast;
821
822 pin_mut!(iter);
823
824 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
825 let mut row_count = 0;
826
827 while let Some(row) = iter.try_next().await? {
828 row_count += 1;
829 let builders_ref =
831 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
832 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
833 builder.append(datum);
834 }
835 if row_count == chunk_size {
836 let columns: Vec<_> = builders
837 .take()
838 .unwrap()
839 .into_iter()
840 .map(|builder| builder.finish().into())
841 .collect();
842 yield (columns, row_count);
843 assert!(builders.is_none());
844 row_count = 0;
845 }
846 }
847
848 if let Some(builders) = builders {
849 assert_gt!(row_count, 0);
850 let columns: Vec<_> = builders
852 .into_iter()
853 .map(|builder| builder.finish().into())
854 .collect();
855 yield (columns, row_count);
856 }
857 }
858
859 async fn chunk_iter_with_pk_bounds(
862 &self,
863 epoch: HummockReadEpoch,
864 pk_prefix: impl Row,
865 range_bounds: impl RangeBounds<OwnedRow>,
866 ordered: bool,
867 chunk_size: usize,
868 prefetch_options: PrefetchOptions,
869 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
870 let iter = self
871 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
872 .await?;
873
874 Ok(Self::convert_row_stream_to_array_vec_stream(
875 iter,
876 self.schema.clone(),
877 chunk_size,
878 ))
879 }
880
881 pub async fn batch_iter_with_pk_bounds(
884 &self,
885 epoch: HummockReadEpoch,
886 pk_prefix: impl Row,
887 range_bounds: impl RangeBounds<OwnedRow>,
888 ordered: bool,
889 prefetch_options: PrefetchOptions,
890 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
891 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
892 .await
893 }
894
895 pub async fn batch_iter(
897 &self,
898 epoch: HummockReadEpoch,
899 ordered: bool,
900 prefetch_options: PrefetchOptions,
901 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
902 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
903 .await
904 }
905
906 pub async fn batch_iter_vnode(
907 &self,
908 epoch: HummockReadEpoch,
909 start_pk: Option<&OwnedRow>,
910 vnode: VirtualNode,
911 prefetch_options: PrefetchOptions,
912 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
913 {
914 let start_bound = if let Some(start_pk) = start_pk {
915 let mut bytes = BytesMut::new();
916 self.pk_serializer.serialize(start_pk, &mut bytes);
917 let bytes = bytes.freeze();
918 Included(bytes)
919 } else {
920 Unbounded
921 };
922 let read_snapshot = self
923 .store
924 .new_read_snapshot(
925 epoch,
926 NewReadSnapshotOptions {
927 table_id: self.table_id,
928 },
929 )
930 .await?;
931 Ok(self
932 .iter_vnode_with_encoded_key_range::<()>(
933 &read_snapshot,
934 None,
935 (start_bound.as_ref(), Unbounded),
936 vnode,
937 prefetch_options,
938 )
939 .await?
940 .map_ok(|(_, row)| row))
941 }
942
943 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
944 self.store
945 .next_epoch(
946 epoch,
947 NextEpochOptions {
948 table_id: self.table_id,
949 },
950 )
951 .await
952 }
953
954 pub async fn batch_iter_vnode_log(
955 &self,
956 start_epoch: u64,
957 end_epoch: HummockReadEpoch,
958 start_pk: Option<&OwnedRow>,
959 vnode: VirtualNode,
960 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
961 {
962 let start_bound = if let Some(start_pk) = start_pk {
963 let mut bytes = BytesMut::new();
964 self.pk_serializer.serialize(start_pk, &mut bytes);
965 let bytes = bytes.freeze();
966 Included(bytes)
967 } else {
968 Unbounded
969 };
970 let stream = self
971 .batch_iter_log_inner::<()>(
972 start_epoch,
973 end_epoch,
974 (start_bound.as_ref(), Unbounded),
975 vnode,
976 )
977 .await?;
978 Ok(stream.map_ok(|(_, row)| row))
979 }
980
981 pub async fn batch_iter_log_with_pk_bounds(
982 &self,
983 start_epoch: u64,
984 end_epoch: HummockReadEpoch,
985 ordered: bool,
986 range_bounds: impl RangeBounds<OwnedRow>,
987 pk_prefix: impl Row,
988 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
989 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
990 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
991 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
992 build_vnode_stream(
993 |vnode| {
994 self.batch_iter_log_inner(
995 start_epoch,
996 end_epoch,
997 (start_key.as_ref(), end_key.as_ref()),
998 vnode,
999 )
1000 },
1001 |vnode| {
1002 self.batch_iter_log_inner(
1003 start_epoch,
1004 end_epoch,
1005 (start_key.as_ref(), end_key.as_ref()),
1006 vnode,
1007 )
1008 },
1009 &vnodes,
1010 ordered,
1011 )
1012 .await
1013 }
1014
1015 async fn batch_iter_log_inner<K: CopyFromSlice>(
1016 &self,
1017 start_epoch: u64,
1018 end_epoch: HummockReadEpoch,
1019 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1020 vnode: VirtualNode,
1021 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1022 {
1023 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1024 let read_options = ReadLogOptions {
1025 table_id: self.table_id,
1026 };
1027 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1028 &self.store,
1029 self.mapping.clone(),
1030 self.row_serde.clone(),
1031 table_key_range,
1032 read_options,
1033 start_epoch,
1034 end_epoch,
1035 )
1036 .await?
1037 .into_stream::<K>();
1038
1039 Ok(iter)
1040 }
1041
1042 pub async fn batch_chunk_iter_with_pk_bounds(
1045 &self,
1046 epoch: HummockReadEpoch,
1047 pk_prefix: impl Row,
1048 range_bounds: impl RangeBounds<OwnedRow>,
1049 ordered: bool,
1050 chunk_size: usize,
1051 prefetch_options: PrefetchOptions,
1052 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1053 let iter = self
1054 .chunk_iter_with_pk_bounds(
1055 epoch,
1056 pk_prefix,
1057 range_bounds,
1058 ordered,
1059 chunk_size,
1060 prefetch_options,
1061 )
1062 .await?;
1063
1064 Ok(iter.map(|item| {
1065 let (columns, row_count) = item?;
1066 Ok(DataChunk::new(columns, row_count))
1067 }))
1068 }
1069}
1070
1071struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1073 iter: SI,
1075
1076 mapping: Arc<ColumnMapping>,
1077
1078 epoch_idx: Option<usize>,
1080
1081 row_deserializer: Arc<SD>,
1082
1083 pk_serializer: Option<Arc<OrderedRowSerde>>,
1085
1086 output_indices: Vec<usize>,
1087
1088 key_output_indices: Option<Vec<usize>>,
1090
1091 value_output_indices: Vec<usize>,
1093
1094 output_row_in_key_indices: Vec<usize>,
1096}
1097
1098impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1099 #[allow(clippy::too_many_arguments)]
1101 async fn new<S>(
1102 store: &S,
1103 mapping: Arc<ColumnMapping>,
1104 epoch_idx: Option<usize>,
1105 pk_serializer: Option<Arc<OrderedRowSerde>>,
1106 output_indices: Vec<usize>,
1107 key_output_indices: Option<Vec<usize>>,
1108 value_output_indices: Vec<usize>,
1109 output_row_in_key_indices: Vec<usize>,
1110 row_deserializer: Arc<SD>,
1111 table_key_range: TableKeyRange,
1112 read_options: ReadOptions,
1113 ) -> StorageResult<Self>
1114 where
1115 S: StateStoreRead<Iter = SI>,
1116 {
1117 let iter = store.iter(table_key_range, read_options).await?;
1118 let iter = Self {
1119 iter,
1120 mapping,
1121 epoch_idx,
1122 row_deserializer,
1123 pk_serializer,
1124 output_indices,
1125 key_output_indices,
1126 value_output_indices,
1127 output_row_in_key_indices,
1128 };
1129 Ok(iter)
1130 }
1131
1132 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1134 async fn into_stream<K: CopyFromSlice>(mut self) {
1135 while let Some((k, v)) = self
1136 .iter
1137 .try_next()
1138 .instrument_await("storage_table_iter_next".verbose())
1139 .await?
1140 {
1141 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1142 let row = self.row_deserializer.deserialize(value)?;
1143 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1144 let row = match &self.key_output_indices {
1145 Some(key_output_indices) => {
1146 let result_row_in_key = match self.pk_serializer.clone() {
1147 Some(pk_serializer) => {
1148 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1149
1150 pk.project(&self.output_row_in_key_indices).into_owned_row()
1151 }
1152 None => OwnedRow::empty(),
1153 };
1154
1155 let mut result_row_vec = vec![];
1156 for idx in &self.output_indices {
1157 if let Some(epoch_idx) = self.epoch_idx
1158 && *idx == epoch_idx
1159 {
1160 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1161 result_row_vec
1162 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1163 } else if self.value_output_indices.contains(idx) {
1164 let item_position_in_value_indices = &self
1165 .value_output_indices
1166 .iter()
1167 .position(|p| idx == p)
1168 .unwrap();
1169 result_row_vec.push(
1170 result_row_in_value
1171 .datum_at(*item_position_in_value_indices)
1172 .to_owned_datum(),
1173 );
1174 } else {
1175 let item_position_in_pk_indices =
1176 key_output_indices.iter().position(|p| idx == p).unwrap();
1177 result_row_vec.push(
1178 result_row_in_key
1179 .datum_at(item_position_in_pk_indices)
1180 .to_owned_datum(),
1181 );
1182 }
1183 }
1184 OwnedRow::new(result_row_vec)
1185 }
1186 None => match &self.epoch_idx {
1187 Some(epoch_idx) => {
1188 let mut result_row_vec = vec![];
1189 for idx in &self.output_indices {
1190 if idx == epoch_idx {
1191 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1192 result_row_vec
1193 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1194 } else {
1195 let item_position_in_value_indices = &self
1196 .value_output_indices
1197 .iter()
1198 .position(|p| idx == p)
1199 .unwrap();
1200 result_row_vec.push(
1201 result_row_in_value
1202 .datum_at(*item_position_in_value_indices)
1203 .to_owned_datum(),
1204 );
1205 }
1206 }
1207 OwnedRow::new(result_row_vec)
1208 }
1209 None => result_row_in_value.into_owned_row(),
1210 },
1211 };
1212 yield (K::copy_from_slice(table_key.as_ref()), row);
1213 }
1214 }
1215}
1216
1217struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1219 iter: S::ChangeLogIter,
1221
1222 mapping: Arc<ColumnMapping>,
1223
1224 row_deserializer: Arc<SD>,
1225}
1226
1227impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1228 #[allow(clippy::too_many_arguments)]
1230 async fn new(
1231 store: &S,
1232 mapping: Arc<ColumnMapping>,
1233 row_deserializer: Arc<SD>,
1234 table_key_range: TableKeyRange,
1235 read_options: ReadLogOptions,
1236 start_epoch: u64,
1237 end_epoch: HummockReadEpoch,
1238 ) -> StorageResult<Self> {
1239 store
1240 .try_wait_epoch(
1241 end_epoch,
1242 TryWaitEpochOptions {
1243 table_id: read_options.table_id,
1244 },
1245 )
1246 .await?;
1247 let iter = store
1248 .iter_log(
1249 (start_epoch, end_epoch.get_epoch()),
1250 table_key_range,
1251 read_options,
1252 )
1253 .await?;
1254 let iter = Self {
1255 iter,
1256 mapping,
1257 row_deserializer,
1258 };
1259 Ok(iter)
1260 }
1261
1262 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1264 self.iter.into_stream(move |(table_key, value)| {
1265 value
1266 .try_map(|value| {
1267 let full_row = self.row_deserializer.deserialize(value)?;
1268 let row = self
1269 .mapping
1270 .project(OwnedRow::new(full_row))
1271 .into_owned_row();
1272 Ok(row)
1273 })
1274 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1275 })
1276 }
1277}