1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19use std::time::Duration;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use bytes::{Bytes, BytesMut};
23use foyer::Hint;
24use futures::future::try_join_all;
25use futures::{Stream, StreamExt, TryStreamExt};
26use futures_async_stream::try_stream;
27use itertools::Itertools;
28use more_asserts::assert_gt;
29use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
33use risingwave_common::row::{self, OwnedRow, Row, RowExt};
34use risingwave_common::types::ToOwnedDatum;
35use risingwave_common::util::epoch::Epoch;
36use risingwave_common::util::row_serde::*;
37use risingwave_common::util::sort_util::OrderType;
38use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
39use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
40use risingwave_hummock_sdk::HummockReadEpoch;
41use risingwave_hummock_sdk::key::{
42 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
43};
44use risingwave_pb::plan_common::StorageTableDesc;
45use tracing::trace;
46mod vector_index_reader;
47pub use vector_index_reader::VectorIndexReader;
48
49use crate::StateStore;
50use crate::error::{StorageError, StorageResult};
51use crate::hummock::CachePolicy;
52use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
53use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
54use crate::row_serde::{ColumnMapping, find_columns_by_ids};
55use crate::store::timeout_auto_rebuild::iter_with_timeout_rebuild;
56use crate::store::{
57 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
58 StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
59};
60use crate::table::merge_sort::NodePeek;
61use crate::table::{ChangeLogRow, KeyedRow, TableDistribution, TableIter};
62
63#[derive(Clone)]
66pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
67 table_id: TableId,
69
70 store: S,
72
73 schema: Schema,
76
77 pk_serializer: OrderedRowSerde,
79
80 output_indices: Vec<usize>,
81
82 key_output_indices: Option<Vec<usize>>,
84
85 value_output_indices: Vec<usize>,
87
88 output_row_in_key_indices: Vec<usize>,
90
91 mapping: Arc<ColumnMapping>,
93
94 epoch_idx: Option<usize>,
96
97 row_serde: Arc<SD>,
100
101 pk_indices: Vec<usize>,
105
106 distribution: TableDistribution,
107
108 table_option: TableOption,
110
111 read_prefix_len_hint: usize,
112}
113
114pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
117
118impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 f.debug_struct("BatchTableInner").finish_non_exhaustive()
121 }
122}
123
124impl<S: StateStore> BatchTableInner<S, EitherSerde> {
126 pub fn new_partial(
137 store: S,
138 output_column_ids: Vec<ColumnId>,
139 vnodes: Option<Arc<Bitmap>>,
140 table_desc: &StorageTableDesc,
141 ) -> Self {
142 let table_id = table_desc.table_id;
143 let column_descs = table_desc
144 .columns
145 .iter()
146 .map(ColumnDesc::from)
147 .collect_vec();
148 let order_types: Vec<OrderType> = table_desc
149 .pk
150 .iter()
151 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
152 .collect();
153
154 let pk_indices = table_desc
155 .pk
156 .iter()
157 .map(|k| k.column_index as usize)
158 .collect_vec();
159
160 let table_option = TableOption {
161 retention_seconds: table_desc.retention_seconds,
162 };
163 let value_indices = table_desc
164 .get_value_indices()
165 .iter()
166 .map(|&k| k as usize)
167 .collect_vec();
168 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
169 let versioned = table_desc.versioned;
170 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
171
172 Self::new_inner(
173 store,
174 table_id,
175 column_descs,
176 output_column_ids,
177 order_types,
178 pk_indices,
179 distribution,
180 table_option,
181 value_indices,
182 prefix_hint_len,
183 versioned,
184 )
185 }
186
187 pub fn for_test_with_partial_columns(
188 store: S,
189 table_id: TableId,
190 columns: Vec<ColumnDesc>,
191 output_column_ids: Vec<ColumnId>,
192 order_types: Vec<OrderType>,
193 pk_indices: Vec<usize>,
194 value_indices: Vec<usize>,
195 ) -> Self {
196 Self::new_inner(
197 store,
198 table_id,
199 columns,
200 output_column_ids,
201 order_types,
202 pk_indices,
203 TableDistribution::singleton(),
204 Default::default(),
205 value_indices,
206 0,
207 false,
208 )
209 }
210
211 pub fn for_test(
212 store: S,
213 table_id: TableId,
214 columns: Vec<ColumnDesc>,
215 order_types: Vec<OrderType>,
216 pk_indices: Vec<usize>,
217 value_indices: Vec<usize>,
218 ) -> Self {
219 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
220 Self::for_test_with_partial_columns(
221 store,
222 table_id,
223 columns,
224 output_column_ids,
225 order_types,
226 pk_indices,
227 value_indices,
228 )
229 }
230
231 #[allow(clippy::too_many_arguments)]
232 fn new_inner(
233 store: S,
234 table_id: TableId,
235 table_columns: Vec<ColumnDesc>,
236 output_column_ids: Vec<ColumnId>,
237 order_types: Vec<OrderType>,
238 pk_indices: Vec<usize>,
239 distribution: TableDistribution,
240 table_option: TableOption,
241 value_indices: Vec<usize>,
242 read_prefix_len_hint: usize,
243 versioned: bool,
244 ) -> Self {
245 assert_eq!(order_types.len(), pk_indices.len());
246
247 let (output_columns, output_indices) =
248 find_columns_by_ids(&table_columns, &output_column_ids);
249
250 let mut value_output_indices = vec![];
251 let mut key_output_indices = vec![];
252 let mut epoch_idx = None;
254
255 for idx in &output_indices {
256 if value_indices.contains(idx) {
257 value_output_indices.push(*idx);
258 } else if pk_indices.contains(idx) {
259 key_output_indices.push(*idx);
260 } else {
261 assert!(epoch_idx.is_none());
262 epoch_idx = Some(*idx);
263 }
264 }
265
266 let output_row_in_key_indices = key_output_indices
267 .iter()
268 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
269 .collect_vec();
270 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
271
272 let pk_data_types = pk_indices
273 .iter()
274 .map(|i| table_columns[*i].data_type.clone())
275 .collect();
276 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
277 let (row_serde, mapping) = {
278 if versioned {
279 let value_output_indices_dedup = value_output_indices
280 .iter()
281 .unique()
282 .copied()
283 .collect::<Vec<_>>();
284 let output_row_in_value_output_indices_dedup = value_output_indices
285 .iter()
286 .map(|&di| {
287 value_output_indices_dedup
288 .iter()
289 .position(|&pi| di == pi)
290 .unwrap()
291 })
292 .collect_vec();
293 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
294 let serde =
295 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
296 (serde.into(), mapping)
297 } else {
298 let output_row_in_value_indices = value_output_indices
299 .iter()
300 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
301 .collect_vec();
302 let mapping = ColumnMapping::new(output_row_in_value_indices);
303 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
304 (serde.into(), mapping)
305 }
306 };
307
308 let key_output_indices = match key_output_indices.is_empty() {
309 true => None,
310 false => Some(key_output_indices),
311 };
312 Self {
313 table_id,
314 store,
315 schema,
316 pk_serializer,
317 output_indices,
318 key_output_indices,
319 value_output_indices,
320 output_row_in_key_indices,
321 mapping: Arc::new(mapping),
322 epoch_idx,
323 row_serde: Arc::new(row_serde),
324 pk_indices,
325 distribution,
326 table_option,
327 read_prefix_len_hint,
328 }
329 }
330}
331
332impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
333 pub fn pk_serializer(&self) -> &OrderedRowSerde {
334 &self.pk_serializer
335 }
336
337 pub fn schema(&self) -> &Schema {
338 &self.schema
339 }
340
341 pub fn pk_indices(&self) -> &[usize] {
342 &self.pk_indices
343 }
344
345 pub fn output_indices(&self) -> &[usize] {
346 &self.output_indices
347 }
348
349 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
353 self.pk_indices
354 .iter()
355 .map(|&i| self.output_indices.iter().position(|&j| i == j))
356 .collect()
357 }
358
359 pub fn table_id(&self) -> TableId {
360 self.table_id
361 }
362
363 pub fn vnodes(&self) -> &Arc<Bitmap> {
364 self.distribution.vnodes()
365 }
366}
367impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
369 pub async fn get_row(
371 &self,
372 pk: impl Row,
373 wait_epoch: HummockReadEpoch,
374 ) -> StorageResult<Option<OwnedRow>> {
375 self.store
376 .try_wait_epoch(
377 wait_epoch,
378 TryWaitEpochOptions {
379 table_id: self.table_id,
380 },
381 )
382 .await?;
383 let serialized_pk = serialize_pk_with_vnode(
384 &pk,
385 &self.pk_serializer,
386 self.distribution.compute_vnode_by_pk(&pk),
387 );
388 assert!(pk.len() <= self.pk_indices.len());
389
390 let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
391 {
392 Some(serialized_pk.slice(VirtualNode::SIZE..))
393 } else {
394 None
395 };
396
397 let read_options = ReadOptions {
398 prefix_hint,
399 cache_policy: CachePolicy::Fill(Hint::Normal),
400 ..Default::default()
401 };
402 let read_snapshot = self
403 .store
404 .new_read_snapshot(
405 wait_epoch,
406 NewReadSnapshotOptions {
407 table_id: self.table_id,
408 table_option: self.table_option,
409 },
410 )
411 .await?;
412 match read_snapshot
413 .on_key_value(serialized_pk, read_options, move |key, value| {
414 let row = self.row_serde.deserialize(value)?;
415 Ok((key.epoch_with_gap.pure_epoch(), row))
416 })
417 .await?
418 {
419 Some((epoch, row)) => {
420 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
421
422 match &self.key_output_indices {
423 Some(key_output_indices) => {
424 let result_row_in_key =
425 pk.project(&self.output_row_in_key_indices).into_owned_row();
426 let mut result_row_vec = vec![];
427 for idx in &self.output_indices {
428 if let Some(epoch_idx) = self.epoch_idx
429 && *idx == epoch_idx
430 {
431 let epoch = Epoch::from(epoch);
432 result_row_vec
433 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
434 } else if self.value_output_indices.contains(idx) {
435 let item_position_in_value_indices = &self
436 .value_output_indices
437 .iter()
438 .position(|p| idx == p)
439 .unwrap();
440 result_row_vec.push(
441 result_row_in_value
442 .datum_at(*item_position_in_value_indices)
443 .to_owned_datum(),
444 );
445 } else {
446 let item_position_in_pk_indices =
447 key_output_indices.iter().position(|p| idx == p).unwrap();
448 result_row_vec.push(
449 result_row_in_key
450 .datum_at(item_position_in_pk_indices)
451 .to_owned_datum(),
452 );
453 }
454 }
455 let result_row = OwnedRow::new(result_row_vec);
456 Ok(Some(result_row))
457 }
458 None => match &self.epoch_idx {
459 Some(epoch_idx) => {
460 let mut result_row_vec = vec![];
461 for idx in &self.output_indices {
462 if idx == epoch_idx {
463 let epoch = Epoch::from(epoch);
464 result_row_vec.push(risingwave_common::types::Datum::from(
465 epoch.as_scalar(),
466 ));
467 } else {
468 let item_position_in_value_indices = &self
469 .value_output_indices
470 .iter()
471 .position(|p| idx == p)
472 .unwrap();
473 result_row_vec.push(
474 result_row_in_value
475 .datum_at(*item_position_in_value_indices)
476 .to_owned_datum(),
477 );
478 }
479 }
480 let result_row = OwnedRow::new(result_row_vec);
481 Ok(Some(result_row))
482 }
483 None => Ok(Some(result_row_in_value.into_owned_row())),
484 },
485 }
486 }
487 _ => Ok(None),
488 }
489 }
490
491 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
493 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
494 self.distribution.update_vnode_bitmap(new_vnodes)
495 }
496}
497
498impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
501 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
502 self.next().await.transpose()
503 }
504}
505
506mod merge_vnode_stream {
507
508 use bytes::Bytes;
509 use futures::{Stream, StreamExt, TryStreamExt};
510 use risingwave_hummock_sdk::key::TableKey;
511
512 use crate::error::StorageResult;
513 use crate::table::KeyedRow;
514 use crate::table::merge_sort::{NodePeek, merge_sort};
515
516 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
517 Single(RowSt),
518 Unordered(Vec<RowSt>),
519 Ordered(Vec<KeyedRowSt>),
520 }
521
522 pub(super) type MergedVnodeStream<
523 R: Send,
524 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
525 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
526 >
527 where
528 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
529 = impl Stream<Item = StorageResult<R>> + Send;
530
531 pub(super) type SortKeyType = Bytes; #[define_opaque(MergedVnodeStream)]
534 pub(super) fn merge_stream<
535 R: Send,
536 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
537 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
538 >(
539 stream: VnodeStreamType<RowSt, KeyedRowSt>,
540 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
541 where
542 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
543 {
544 #[auto_enums::auto_enum(futures03::Stream)]
545 match stream {
546 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
547 VnodeStreamType::Unordered(streams) => futures::stream::iter(
548 streams
549 .into_iter()
550 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
551 )
552 .flatten_unordered(1024),
553 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
554 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
555 vnode_prefixed_key: TableKey(key),
556 row,
557 }))
558 }))
559 .map_ok(|keyed_row| keyed_row.row),
560 }
561 }
562}
563
564use merge_vnode_stream::*;
565
566async fn build_vnode_stream<
567 R: Send,
568 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
569 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
570 RowStFut: Future<Output = StorageResult<RowSt>>,
571 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
572>(
573 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
574 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
575 vnodes: &[VirtualNode],
576 ordered: bool,
577) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
578where
579 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
580{
581 let stream = match vnodes {
582 [] => unreachable!(),
583 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
584 vnodes if !ordered => VnodeStreamType::Unordered(
586 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
587 ),
588 vnodes => VnodeStreamType::Ordered(
590 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
591 ),
592 };
593 Ok(merge_stream(stream))
594}
595
596impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
598 async fn iter_with_encoded_key_range(
601 &self,
602 prefix_hint: Option<Bytes>,
603 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
604 wait_epoch: HummockReadEpoch,
605 vnode_hint: Option<VirtualNode>,
606 ordered: bool,
607 prefetch_options: PrefetchOptions,
608 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
609 {
610 let vnodes = match vnode_hint {
611 Some(vnode) => {
613 assert!(
614 self.distribution.vnodes().is_set(vnode.to_index()),
615 "vnode unset: {:?}, distribution: {:?}",
616 vnode,
617 self.distribution
618 );
619 vec![vnode]
620 }
621 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
623 };
624
625 let read_snapshot = self
626 .store
627 .new_read_snapshot(
628 wait_epoch,
629 NewReadSnapshotOptions {
630 table_id: self.table_id,
631 table_option: self.table_option,
632 },
633 )
634 .await?;
635
636 build_vnode_stream(
637 |vnode| {
638 self.iter_vnode_with_encoded_key_range(
639 &read_snapshot,
640 prefix_hint.clone(),
641 (start_bound.as_ref(), end_bound.as_ref()),
642 vnode,
643 prefetch_options,
644 )
645 },
646 |vnode| {
647 self.iter_vnode_with_encoded_key_range(
648 &read_snapshot,
649 prefix_hint.clone(),
650 (start_bound.as_ref(), end_bound.as_ref()),
651 vnode,
652 prefetch_options,
653 )
654 },
655 &vnodes,
656 ordered,
657 )
658 .await
659 }
660
661 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
662 &self,
663 read_snapshot: &S::ReadSnapshot,
664 prefix_hint: Option<Bytes>,
665 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
666 vnode: VirtualNode,
667 prefetch_options: PrefetchOptions,
668 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
669 {
670 let (table_key_range, read_options, pk_serializer) =
671 self.vnode_read_context(prefix_hint, encoded_key_range, vnode, prefetch_options);
672
673 let iter = read_snapshot.iter(table_key_range, read_options).await?;
674 Ok(self.iter_stream_from_state_store_iter::<K, _>(iter, pk_serializer))
675 }
676
677 fn vnode_read_context(
678 &self,
679 prefix_hint: Option<Bytes>,
680 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
681 vnode: VirtualNode,
682 prefetch_options: PrefetchOptions,
683 ) -> (TableKeyRange, ReadOptions, Option<Arc<OrderedRowSerde>>) {
684 let cache_policy = match &encoded_key_range {
685 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
688 _ => CachePolicy::Fill(Hint::Normal),
689 };
690
691 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
692 {
693 {
694 let read_options = ReadOptions {
695 prefix_hint,
696 prefetch_options,
697 cache_policy,
698 };
699 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
700 true => None,
701 false => Some(Arc::new(self.pk_serializer.clone())),
702 };
703
704 (table_key_range, read_options, pk_serializer)
705 }
706 }
707 }
708
709 fn iter_stream_from_state_store_iter<K: CopyFromSlice, SI: StateStoreIter + Send>(
710 &self,
711 iter: SI,
712 pk_serializer: Option<Arc<OrderedRowSerde>>,
713 ) -> impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, SI, S, SD> {
714 BatchTableInnerIterInner {
715 iter,
716 mapping: self.mapping.clone(),
717 epoch_idx: self.epoch_idx,
718 row_deserializer: self.row_serde.clone(),
719 pk_serializer,
720 output_indices: self.output_indices.clone(),
721 key_output_indices: self.key_output_indices.clone(),
722 value_output_indices: self.value_output_indices.clone(),
723 output_row_in_key_indices: self.output_row_in_key_indices.clone(),
724 }
725 .into_stream::<K>()
726 }
727
728 fn serialize_pk_bound(
730 &self,
731 pk_prefix: impl Row,
732 range_bound: Bound<&OwnedRow>,
733 is_start_bound: bool,
734 ) -> Bound<Bytes> {
735 match range_bound {
736 Included(k) => {
737 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
738 let key = pk_prefix.chain(k);
739 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
740 if is_start_bound {
741 Included(serialized_key)
742 } else {
743 end_bound_of_prefix(&serialized_key)
746 }
747 }
748 Excluded(k) => {
749 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
750 let key = pk_prefix.chain(k);
751 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
752 if is_start_bound {
753 let next_serialized_key = next_key(&serialized_key);
758 assert!(!next_serialized_key.is_empty());
759 Included(Bytes::from(next_serialized_key))
760 } else {
761 Excluded(serialized_key)
762 }
763 }
764 Unbounded => {
765 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
766 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
767 if pk_prefix.is_empty() {
768 Unbounded
769 } else if is_start_bound {
770 Included(serialized_pk_prefix)
771 } else {
772 end_bound_of_prefix(&serialized_pk_prefix)
773 }
774 }
775 }
776 }
777
778 async fn iter_with_pk_bounds(
780 &self,
781 epoch: HummockReadEpoch,
782 pk_prefix: impl Row,
783 range_bounds: impl RangeBounds<OwnedRow>,
784 ordered: bool,
785 prefetch_options: PrefetchOptions,
786 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
787 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
788 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
789 assert!(pk_prefix.len() <= self.pk_indices.len());
790 let pk_prefix_indices = (0..pk_prefix.len())
791 .map(|index| self.pk_indices[index])
792 .collect_vec();
793
794 let prefix_hint = if self.read_prefix_len_hint != 0
795 && self.read_prefix_len_hint <= pk_prefix.len()
796 {
797 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
798 start_key
799 } else {
800 unreachable!()
801 };
802 let prefix_len = self
803 .pk_serializer
804 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
805 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
806 } else {
807 trace!(
808 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
809 self.table_id, pk_prefix, pk_prefix_indices
810 );
811 None
812 };
813
814 trace!(
815 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
816 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
817 );
818
819 self.iter_with_encoded_key_range(
820 prefix_hint,
821 (start_key, end_key),
822 epoch,
823 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
824 ordered,
825 prefetch_options,
826 )
827 .await
828 }
829
830 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
832 async fn convert_row_stream_to_array_vec_stream(
833 iter: impl Stream<Item = StorageResult<OwnedRow>>,
834 schema: Schema,
835 chunk_size: usize,
836 ) {
837 use futures::{TryStreamExt, pin_mut};
838 use risingwave_common::util::iter_util::ZipEqFast;
839
840 pin_mut!(iter);
841
842 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
843 let mut row_count = 0;
844
845 while let Some(row) = iter.try_next().await? {
846 row_count += 1;
847 let builders_ref =
849 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
850 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
851 builder.append(datum);
852 }
853 if row_count == chunk_size {
854 let columns: Vec<_> = builders
855 .take()
856 .unwrap()
857 .into_iter()
858 .map(|builder| builder.finish().into())
859 .collect();
860 yield (columns, row_count);
861 assert!(builders.is_none());
862 row_count = 0;
863 }
864 }
865
866 if let Some(builders) = builders {
867 assert_gt!(row_count, 0);
868 let columns: Vec<_> = builders
870 .into_iter()
871 .map(|builder| builder.finish().into())
872 .collect();
873 yield (columns, row_count);
874 }
875 }
876
877 async fn chunk_iter_with_pk_bounds(
880 &self,
881 epoch: HummockReadEpoch,
882 pk_prefix: impl Row,
883 range_bounds: impl RangeBounds<OwnedRow>,
884 ordered: bool,
885 chunk_size: usize,
886 prefetch_options: PrefetchOptions,
887 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
888 let iter = self
889 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
890 .await?;
891
892 Ok(Self::convert_row_stream_to_array_vec_stream(
893 iter,
894 self.schema.clone(),
895 chunk_size,
896 ))
897 }
898
899 pub async fn batch_iter_with_pk_bounds(
902 &self,
903 epoch: HummockReadEpoch,
904 pk_prefix: impl Row,
905 range_bounds: impl RangeBounds<OwnedRow>,
906 ordered: bool,
907 prefetch_options: PrefetchOptions,
908 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
909 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
910 .await
911 }
912
913 pub async fn batch_iter(
915 &self,
916 epoch: HummockReadEpoch,
917 ordered: bool,
918 prefetch_options: PrefetchOptions,
919 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
920 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
921 .await
922 }
923
924 fn start_bound_from_pk(&self, start_pk: Option<&OwnedRow>) -> Bound<Bytes> {
925 if let Some(start_pk) = start_pk {
926 let mut bytes = BytesMut::new();
927 self.pk_serializer.serialize(start_pk, &mut bytes);
928 let bytes = bytes.freeze();
929 Included(bytes)
930 } else {
931 Unbounded
932 }
933 }
934
935 pub async fn batch_iter_vnode(
936 &self,
937 epoch: HummockReadEpoch,
938 start_pk: Option<&OwnedRow>,
939 vnode: VirtualNode,
940 prefetch_options: PrefetchOptions,
941 rebuild_interval: Duration,
942 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
943 {
944 assert!(
945 !rebuild_interval.is_zero(),
946 "rebuild_interval should be positive"
947 );
948 let start_bound = self.start_bound_from_pk(start_pk);
949 let snapshot = Arc::new(
950 self.store
951 .new_read_snapshot(
952 epoch,
953 NewReadSnapshotOptions {
954 table_id: self.table_id,
955 table_option: self.table_option,
956 },
957 )
958 .await?,
959 );
960 let (table_key_range, read_options, pk_serializer) = self.vnode_read_context(
961 None,
962 (start_bound.as_ref(), Unbounded),
963 vnode,
964 prefetch_options,
965 );
966 let iter = iter_with_timeout_rebuild(
967 snapshot,
968 table_key_range,
969 self.table_id,
970 read_options,
971 rebuild_interval,
972 )
973 .await?;
974 let iter = self.iter_stream_from_state_store_iter::<(), _>(iter, pk_serializer);
975 Ok(iter.map_ok(|(_, row)| row))
976 }
977
978 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
979 self.store
980 .next_epoch(
981 epoch,
982 NextEpochOptions {
983 table_id: self.table_id,
984 },
985 )
986 .await
987 }
988
989 pub async fn batch_iter_vnode_log(
990 &self,
991 start_epoch: u64,
992 end_epoch: HummockReadEpoch,
993 start_pk: Option<&OwnedRow>,
994 vnode: VirtualNode,
995 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
996 {
997 let start_bound = self.start_bound_from_pk(start_pk);
998 let stream = self
999 .batch_iter_log_inner::<()>(
1000 start_epoch,
1001 end_epoch,
1002 (start_bound.as_ref(), Unbounded),
1003 vnode,
1004 )
1005 .await?;
1006 Ok(stream.map_ok(|(_, row)| row))
1007 }
1008
1009 pub async fn batch_iter_log_with_pk_bounds(
1010 &self,
1011 start_epoch: u64,
1012 end_epoch: HummockReadEpoch,
1013 ordered: bool,
1014 range_bounds: impl RangeBounds<OwnedRow>,
1015 pk_prefix: impl Row,
1016 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
1017 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
1018 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
1019 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1020 build_vnode_stream(
1021 |vnode| {
1022 self.batch_iter_log_inner(
1023 start_epoch,
1024 end_epoch,
1025 (start_key.as_ref(), end_key.as_ref()),
1026 vnode,
1027 )
1028 },
1029 |vnode| {
1030 self.batch_iter_log_inner(
1031 start_epoch,
1032 end_epoch,
1033 (start_key.as_ref(), end_key.as_ref()),
1034 vnode,
1035 )
1036 },
1037 &vnodes,
1038 ordered,
1039 )
1040 .await
1041 }
1042
1043 async fn batch_iter_log_inner<K: CopyFromSlice>(
1044 &self,
1045 start_epoch: u64,
1046 end_epoch: HummockReadEpoch,
1047 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1048 vnode: VirtualNode,
1049 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1050 {
1051 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1052 let read_options = ReadLogOptions {
1053 table_id: self.table_id,
1054 };
1055 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1056 &self.store,
1057 self.mapping.clone(),
1058 self.row_serde.clone(),
1059 table_key_range,
1060 read_options,
1061 start_epoch,
1062 end_epoch,
1063 )
1064 .await?
1065 .into_stream::<K>();
1066
1067 Ok(iter)
1068 }
1069
1070 pub async fn batch_chunk_iter_with_pk_bounds(
1073 &self,
1074 epoch: HummockReadEpoch,
1075 pk_prefix: impl Row,
1076 range_bounds: impl RangeBounds<OwnedRow>,
1077 ordered: bool,
1078 chunk_size: usize,
1079 prefetch_options: PrefetchOptions,
1080 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1081 let iter = self
1082 .chunk_iter_with_pk_bounds(
1083 epoch,
1084 pk_prefix,
1085 range_bounds,
1086 ordered,
1087 chunk_size,
1088 prefetch_options,
1089 )
1090 .await?;
1091
1092 Ok(iter.map(|item| {
1093 let (columns, row_count) = item?;
1094 Ok(DataChunk::new(columns, row_count))
1095 }))
1096 }
1097}
1098
1099struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1101 iter: SI,
1103
1104 mapping: Arc<ColumnMapping>,
1105
1106 epoch_idx: Option<usize>,
1108
1109 row_deserializer: Arc<SD>,
1110
1111 pk_serializer: Option<Arc<OrderedRowSerde>>,
1113
1114 output_indices: Vec<usize>,
1115
1116 key_output_indices: Option<Vec<usize>>,
1118
1119 value_output_indices: Vec<usize>,
1121
1122 output_row_in_key_indices: Vec<usize>,
1124}
1125
1126impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1127 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1129 async fn into_stream<K: CopyFromSlice>(mut self) {
1130 while let Some((k, v)) = self
1131 .iter
1132 .try_next()
1133 .instrument_await("storage_table_iter_next".verbose())
1134 .await?
1135 {
1136 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1137 let row = self.row_deserializer.deserialize(value)?;
1138 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1139 let row = match &self.key_output_indices {
1140 Some(key_output_indices) => {
1141 let result_row_in_key = match self.pk_serializer.clone() {
1142 Some(pk_serializer) => {
1143 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1144
1145 pk.project(&self.output_row_in_key_indices).into_owned_row()
1146 }
1147 None => OwnedRow::empty(),
1148 };
1149
1150 let mut result_row_vec = vec![];
1151 for idx in &self.output_indices {
1152 if let Some(epoch_idx) = self.epoch_idx
1153 && *idx == epoch_idx
1154 {
1155 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1156 result_row_vec
1157 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1158 } else if self.value_output_indices.contains(idx) {
1159 let item_position_in_value_indices = &self
1160 .value_output_indices
1161 .iter()
1162 .position(|p| idx == p)
1163 .unwrap();
1164 result_row_vec.push(
1165 result_row_in_value
1166 .datum_at(*item_position_in_value_indices)
1167 .to_owned_datum(),
1168 );
1169 } else {
1170 let item_position_in_pk_indices =
1171 key_output_indices.iter().position(|p| idx == p).unwrap();
1172 result_row_vec.push(
1173 result_row_in_key
1174 .datum_at(item_position_in_pk_indices)
1175 .to_owned_datum(),
1176 );
1177 }
1178 }
1179 OwnedRow::new(result_row_vec)
1180 }
1181 None => match &self.epoch_idx {
1182 Some(epoch_idx) => {
1183 let mut result_row_vec = vec![];
1184 for idx in &self.output_indices {
1185 if idx == epoch_idx {
1186 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1187 result_row_vec
1188 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1189 } else {
1190 let item_position_in_value_indices = &self
1191 .value_output_indices
1192 .iter()
1193 .position(|p| idx == p)
1194 .unwrap();
1195 result_row_vec.push(
1196 result_row_in_value
1197 .datum_at(*item_position_in_value_indices)
1198 .to_owned_datum(),
1199 );
1200 }
1201 }
1202 OwnedRow::new(result_row_vec)
1203 }
1204 None => result_row_in_value.into_owned_row(),
1205 },
1206 };
1207 yield (K::copy_from_slice(table_key.as_ref()), row);
1208 }
1209 }
1210}
1211
1212struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1214 iter: S::ChangeLogIter,
1216
1217 mapping: Arc<ColumnMapping>,
1218
1219 row_deserializer: Arc<SD>,
1220}
1221
1222impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1223 #[allow(clippy::too_many_arguments)]
1225 async fn new(
1226 store: &S,
1227 mapping: Arc<ColumnMapping>,
1228 row_deserializer: Arc<SD>,
1229 table_key_range: TableKeyRange,
1230 read_options: ReadLogOptions,
1231 start_epoch: u64,
1232 end_epoch: HummockReadEpoch,
1233 ) -> StorageResult<Self> {
1234 store
1235 .try_wait_epoch(
1236 end_epoch,
1237 TryWaitEpochOptions {
1238 table_id: read_options.table_id,
1239 },
1240 )
1241 .await?;
1242 let iter = store
1243 .iter_log(
1244 (start_epoch, end_epoch.get_epoch()),
1245 table_key_range,
1246 read_options,
1247 )
1248 .await?;
1249 let iter = Self {
1250 iter,
1251 mapping,
1252 row_deserializer,
1253 };
1254 Ok(iter)
1255 }
1256
1257 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1259 self.iter.into_stream(move |(table_key, value)| {
1260 value
1261 .try_map(|value| {
1262 let full_row = self.row_deserializer.deserialize(value)?;
1263 let row = self
1264 .mapping
1265 .project(OwnedRow::new(full_row))
1266 .into_owned_row();
1267 Ok(row)
1268 })
1269 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1270 })
1271 }
1272}