1use std::future::Future;
16use std::ops::Bound::{self, Excluded, Included, Unbounded};
17use std::ops::RangeBounds;
18use std::sync::Arc;
19use std::time::Duration;
20
21use await_tree::{InstrumentAwait, SpanExt};
22use bytes::{Bytes, BytesMut};
23use foyer::Hint;
24use futures::future::try_join_all;
25use futures::{Stream, StreamExt, TryStreamExt};
26use futures_async_stream::try_stream;
27use itertools::Itertools;
28use more_asserts::assert_gt;
29use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
30use risingwave_common::bitmap::Bitmap;
31use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
32use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
33use risingwave_common::row::{self, OwnedRow, Row, RowExt};
34use risingwave_common::types::{DataType, ToOwnedDatum};
35use risingwave_common::util::epoch::Epoch;
36use risingwave_common::util::row_serde::*;
37use risingwave_common::util::sort_util::OrderType;
38use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
39use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde, deserialize_datum};
40use risingwave_hummock_sdk::HummockReadEpoch;
41use risingwave_hummock_sdk::key::{
42 CopyFromSlice, TableKeyRange, end_bound_of_prefix, next_key, prefixed_range_with_vnode,
43};
44use risingwave_pb::batch_plan::{PbScanRange, scan_range};
45use risingwave_pb::plan_common::StorageTableDesc;
46use tracing::trace;
47mod vector_index_reader;
48pub use vector_index_reader::VectorIndexReader;
49
50use crate::StateStore;
51use crate::error::{StorageError, StorageResult};
52use crate::hummock::CachePolicy;
53use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
54use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
55use crate::row_serde::{ColumnMapping, find_columns_by_ids};
56use crate::store::timeout_auto_rebuild::iter_with_timeout_rebuild;
57use crate::store::{
58 NewReadSnapshotOptions, NextEpochOptions, PrefetchOptions, ReadLogOptions, ReadOptions,
59 StateStoreGet, StateStoreIter, StateStoreIterExt, StateStoreRead, TryWaitEpochOptions,
60};
61use crate::table::merge_sort::NodePeek;
62use crate::table::{
63 ChangeLogRow, KeyedRow, TableDistribution, TableIter, should_calculate_prefix_hint,
64};
65
66pub type PkRangeBounds = (Bound<OwnedRow>, Bound<OwnedRow>);
67
68#[derive(Clone, Debug)]
73pub struct PkScanRange {
74 pub pk_prefix: OwnedRow,
76
77 pub range_bounds: PkRangeBounds,
79}
80
81impl PkScanRange {
82 fn pk_type_at(pk_types: &[DataType], index: usize) -> StorageResult<&DataType> {
83 pk_types.get(index).ok_or_else(|| {
84 StorageError::from(memcomparable::Error::Message(format!(
85 "invalid scan range: primary key index {} exceeds primary key length {}",
86 index,
87 pk_types.len()
88 )))
89 })
90 }
91
92 pub fn new(scan_range: PbScanRange, pk_types: Vec<DataType>) -> StorageResult<Self> {
94 let mut index = 0;
95 let pk_prefix = OwnedRow::new(
96 scan_range
97 .eq_conds
98 .iter()
99 .map(|v| -> StorageResult<_> {
100 let ty = Self::pk_type_at(&pk_types, index)?;
101 index += 1;
102 Ok(deserialize_datum(v.as_slice(), ty)?)
103 })
104 .try_collect()?,
105 );
106 if scan_range.lower_bound.is_none() && scan_range.upper_bound.is_none() {
107 return Ok(Self {
108 pk_prefix,
109 ..Self::full()
110 });
111 }
112
113 let build_bound =
114 |bound: &scan_range::Bound, mut index| -> StorageResult<Bound<OwnedRow>> {
115 let range_bound = OwnedRow::new(
116 bound
117 .value
118 .iter()
119 .map(|v| -> StorageResult<_> {
120 let ty = Self::pk_type_at(&pk_types, index)?;
121 index += 1;
122 Ok(deserialize_datum(v.as_slice(), ty)?)
123 })
124 .try_collect()?,
125 );
126 if bound.inclusive {
127 Ok(Bound::Included(range_bound))
128 } else {
129 Ok(Bound::Excluded(range_bound))
130 }
131 };
132
133 let range_bounds = match (
134 scan_range.lower_bound.as_ref(),
135 scan_range.upper_bound.as_ref(),
136 ) {
137 (Some(lb), Some(ub)) => (build_bound(lb, index)?, build_bound(ub, index)?),
138 (None, Some(ub)) => (Bound::Unbounded, build_bound(ub, index)?),
139 (Some(lb), None) => (build_bound(lb, index)?, Bound::Unbounded),
140 (None, None) => unreachable!(),
141 };
142 Ok(Self {
143 pk_prefix,
144 range_bounds,
145 })
146 }
147
148 pub fn full() -> Self {
150 Self {
151 pk_prefix: OwnedRow::default(),
152 range_bounds: (Bound::Unbounded, Bound::Unbounded),
153 }
154 }
155
156 pub fn build_from_protobuf(
157 scan_ranges: &[PbScanRange],
158 table_desc: &StorageTableDesc,
159 ) -> StorageResult<Vec<Self>> {
160 if scan_ranges.is_empty() {
161 Ok(vec![Self::full()])
162 } else {
163 scan_ranges
164 .iter()
165 .map(|scan_range| Self::from_protobuf(scan_range, table_desc))
166 .try_collect()
167 }
168 }
169
170 pub fn from_protobuf(
171 scan_range: &PbScanRange,
172 table_desc: &StorageTableDesc,
173 ) -> StorageResult<Self> {
174 let pk_types = table_desc
175 .pk
176 .iter()
177 .map(|order| {
178 DataType::from(
179 table_desc.columns[order.column_index as usize]
180 .column_type
181 .as_ref()
182 .unwrap(),
183 )
184 })
185 .collect_vec();
186 Self::new(scan_range.clone(), pk_types)
187 }
188
189 pub fn convert_to_range_bounds<S: StateStore, SD: ValueRowSerde>(
190 self,
191 table: &BatchTableInner<S, SD>,
192 ) -> PkRangeBounds {
193 let PkScanRange {
194 pk_prefix,
195 range_bounds,
196 } = self;
197
198 let Some(order_type) = table.pk_serializer().get_order_types().get(pk_prefix.len()) else {
200 return range_bounds;
201 };
202 let (start_bound, end_bound) = if order_type.is_ascending() {
203 (range_bounds.0, range_bounds.1)
204 } else {
205 (range_bounds.1, range_bounds.0)
206 };
207
208 let start_bound_is_bounded = !matches!(start_bound, Bound::Unbounded);
209 let end_bound_is_bounded = !matches!(end_bound, Bound::Unbounded);
210
211 let build_bound = |other_bound_is_bounded: bool, bound, order_type_nulls| match bound {
212 Bound::Unbounded => {
213 if other_bound_is_bounded && order_type_nulls {
214 Bound::Excluded(OwnedRow::new(vec![None]))
217 } else {
218 Bound::Unbounded
220 }
221 }
222 Bound::Included(x) => Bound::Included(x),
223 Bound::Excluded(x) => Bound::Excluded(x),
224 };
225 let start_bound = build_bound(
226 end_bound_is_bounded,
227 start_bound,
228 order_type.nulls_are_first(),
229 );
230 let end_bound = build_bound(
231 start_bound_is_bounded,
232 end_bound,
233 order_type.nulls_are_last(),
234 );
235 (start_bound, end_bound)
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242
243 #[test]
244 fn test_pk_scan_range_rejects_out_of_bounds_pk_index() {
245 let scan_range = PbScanRange {
246 eq_conds: vec![vec![]],
247 ..Default::default()
248 };
249 assert!(PkScanRange::new(scan_range, vec![]).is_err());
250
251 let scan_range = PbScanRange {
252 lower_bound: Some(scan_range::Bound {
253 value: vec![vec![]],
254 inclusive: false,
255 }),
256 ..Default::default()
257 };
258 assert!(PkScanRange::new(scan_range, vec![]).is_err());
259 }
260}
261
262#[derive(Clone)]
265pub struct BatchTableInner<S: StateStore, SD: ValueRowSerde> {
266 table_id: TableId,
268
269 store: S,
271
272 schema: Schema,
275
276 pk_serializer: OrderedRowSerde,
278
279 output_indices: Vec<usize>,
280
281 key_output_indices: Option<Vec<usize>>,
283
284 value_output_indices: Vec<usize>,
286
287 output_row_in_key_indices: Vec<usize>,
289
290 mapping: Arc<ColumnMapping>,
292
293 epoch_idx: Option<usize>,
295
296 row_serde: Arc<SD>,
299
300 pk_indices: Vec<usize>,
304
305 distribution: TableDistribution,
306
307 table_option: TableOption,
309
310 read_prefix_len_hint: usize,
311}
312
313pub type BatchTable<S> = BatchTableInner<S, EitherSerde>;
316
317impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for BatchTableInner<S, SD> {
318 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
319 f.debug_struct("BatchTableInner").finish_non_exhaustive()
320 }
321}
322
323impl<S: StateStore> BatchTableInner<S, EitherSerde> {
325 pub fn new_partial(
336 store: S,
337 output_column_ids: Vec<ColumnId>,
338 vnodes: Option<Arc<Bitmap>>,
339 table_desc: &StorageTableDesc,
340 ) -> Self {
341 let table_id = table_desc.table_id;
342 let column_descs = table_desc
343 .columns
344 .iter()
345 .map(ColumnDesc::from)
346 .collect_vec();
347 let order_types: Vec<OrderType> = table_desc
348 .pk
349 .iter()
350 .map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
351 .collect();
352
353 let pk_indices = table_desc
354 .pk
355 .iter()
356 .map(|k| k.column_index as usize)
357 .collect_vec();
358
359 let table_option = TableOption {
360 retention_seconds: table_desc.retention_seconds,
361 };
362 let value_indices = table_desc
363 .get_value_indices()
364 .iter()
365 .map(|&k| k as usize)
366 .collect_vec();
367 let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
368 let versioned = table_desc.versioned;
369 let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
370
371 Self::new_inner(
372 store,
373 table_id,
374 column_descs,
375 output_column_ids,
376 order_types,
377 pk_indices,
378 distribution,
379 table_option,
380 value_indices,
381 prefix_hint_len,
382 versioned,
383 )
384 }
385
386 pub fn for_test_with_partial_columns(
387 store: S,
388 table_id: TableId,
389 columns: Vec<ColumnDesc>,
390 output_column_ids: Vec<ColumnId>,
391 order_types: Vec<OrderType>,
392 pk_indices: Vec<usize>,
393 value_indices: Vec<usize>,
394 ) -> Self {
395 Self::new_inner(
396 store,
397 table_id,
398 columns,
399 output_column_ids,
400 order_types,
401 pk_indices,
402 TableDistribution::singleton(),
403 Default::default(),
404 value_indices,
405 0,
406 false,
407 )
408 }
409
410 pub fn for_test(
411 store: S,
412 table_id: TableId,
413 columns: Vec<ColumnDesc>,
414 order_types: Vec<OrderType>,
415 pk_indices: Vec<usize>,
416 value_indices: Vec<usize>,
417 ) -> Self {
418 let output_column_ids = columns.iter().map(|c| c.column_id).collect();
419 Self::for_test_with_partial_columns(
420 store,
421 table_id,
422 columns,
423 output_column_ids,
424 order_types,
425 pk_indices,
426 value_indices,
427 )
428 }
429
430 #[allow(clippy::too_many_arguments)]
431 fn new_inner(
432 store: S,
433 table_id: TableId,
434 table_columns: Vec<ColumnDesc>,
435 output_column_ids: Vec<ColumnId>,
436 order_types: Vec<OrderType>,
437 pk_indices: Vec<usize>,
438 distribution: TableDistribution,
439 table_option: TableOption,
440 value_indices: Vec<usize>,
441 read_prefix_len_hint: usize,
442 versioned: bool,
443 ) -> Self {
444 assert_eq!(order_types.len(), pk_indices.len());
445
446 let (output_columns, output_indices) =
447 find_columns_by_ids(&table_columns, &output_column_ids);
448
449 let mut value_output_indices = vec![];
450 let mut key_output_indices = vec![];
451 let mut epoch_idx = None;
453
454 for idx in &output_indices {
455 if value_indices.contains(idx) {
456 value_output_indices.push(*idx);
457 } else if pk_indices.contains(idx) {
458 key_output_indices.push(*idx);
459 } else {
460 assert!(epoch_idx.is_none());
461 epoch_idx = Some(*idx);
462 }
463 }
464
465 let output_row_in_key_indices = key_output_indices
466 .iter()
467 .map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
468 .collect_vec();
469 let schema = Schema::new(output_columns.iter().map(Into::into).collect());
470
471 let pk_data_types = pk_indices
472 .iter()
473 .map(|i| table_columns[*i].data_type.clone())
474 .collect();
475 let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
476 let (row_serde, mapping) = {
477 if versioned {
478 let value_output_indices_dedup = value_output_indices
479 .iter()
480 .unique()
481 .copied()
482 .collect::<Vec<_>>();
483 let output_row_in_value_output_indices_dedup = value_output_indices
484 .iter()
485 .map(|&di| {
486 value_output_indices_dedup
487 .iter()
488 .position(|&pi| di == pi)
489 .unwrap()
490 })
491 .collect_vec();
492 let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
493 let serde =
494 ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
495 (serde.into(), mapping)
496 } else {
497 let output_row_in_value_indices = value_output_indices
498 .iter()
499 .map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
500 .collect_vec();
501 let mapping = ColumnMapping::new(output_row_in_value_indices);
502 let serde = BasicSerde::new(value_indices.into(), table_columns.into());
503 (serde.into(), mapping)
504 }
505 };
506
507 let key_output_indices = match key_output_indices.is_empty() {
508 true => None,
509 false => Some(key_output_indices),
510 };
511 Self {
512 table_id,
513 store,
514 schema,
515 pk_serializer,
516 output_indices,
517 key_output_indices,
518 value_output_indices,
519 output_row_in_key_indices,
520 mapping: Arc::new(mapping),
521 epoch_idx,
522 row_serde: Arc::new(row_serde),
523 pk_indices,
524 distribution,
525 table_option,
526 read_prefix_len_hint,
527 }
528 }
529}
530
531impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
532 pub fn pk_serializer(&self) -> &OrderedRowSerde {
533 &self.pk_serializer
534 }
535
536 pub fn schema(&self) -> &Schema {
537 &self.schema
538 }
539
540 pub fn pk_indices(&self) -> &[usize] {
541 &self.pk_indices
542 }
543
544 pub fn output_indices(&self) -> &[usize] {
545 &self.output_indices
546 }
547
548 pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
552 self.pk_indices
553 .iter()
554 .map(|&i| self.output_indices.iter().position(|&j| i == j))
555 .collect()
556 }
557
558 pub fn table_id(&self) -> TableId {
559 self.table_id
560 }
561
562 pub fn vnodes(&self) -> &Arc<Bitmap> {
563 self.distribution.vnodes()
564 }
565}
566impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
568 pub async fn get_row(
570 &self,
571 pk: impl Row,
572 wait_epoch: HummockReadEpoch,
573 ) -> StorageResult<Option<OwnedRow>> {
574 self.store
575 .try_wait_epoch(
576 wait_epoch,
577 TryWaitEpochOptions {
578 table_id: self.table_id,
579 },
580 )
581 .await?;
582 let serialized_pk = serialize_pk_with_vnode(
583 &pk,
584 &self.pk_serializer,
585 self.distribution.compute_vnode_by_pk(&pk),
586 );
587 assert!(pk.len() <= self.pk_indices.len());
588
589 let prefix_hint =
590 if should_calculate_prefix_hint(self.read_prefix_len_hint, pk.len(), false) {
591 Some(serialized_pk.slice(VirtualNode::SIZE..))
592 } else {
593 None
594 };
595
596 let read_options = ReadOptions {
597 prefix_hint,
598 cache_policy: CachePolicy::Fill(Hint::Normal),
599 ..Default::default()
600 };
601 let read_snapshot = self
602 .store
603 .new_read_snapshot(
604 wait_epoch,
605 NewReadSnapshotOptions {
606 table_id: self.table_id,
607 table_option: self.table_option,
608 },
609 )
610 .await?;
611 match read_snapshot
612 .on_key_value(serialized_pk, read_options, move |key, value| {
613 let row = self.row_serde.deserialize(value)?;
614 Ok((key.epoch_with_gap.pure_epoch(), row))
615 })
616 .await?
617 {
618 Some((epoch, row)) => {
619 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
620
621 match &self.key_output_indices {
622 Some(key_output_indices) => {
623 let result_row_in_key =
624 pk.project(&self.output_row_in_key_indices).into_owned_row();
625 let mut result_row_vec = vec![];
626 for idx in &self.output_indices {
627 if let Some(epoch_idx) = self.epoch_idx
628 && *idx == epoch_idx
629 {
630 let epoch = Epoch::from(epoch);
631 result_row_vec
632 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
633 } else if self.value_output_indices.contains(idx) {
634 let item_position_in_value_indices = &self
635 .value_output_indices
636 .iter()
637 .position(|p| idx == p)
638 .unwrap();
639 result_row_vec.push(
640 result_row_in_value
641 .datum_at(*item_position_in_value_indices)
642 .to_owned_datum(),
643 );
644 } else {
645 let item_position_in_pk_indices =
646 key_output_indices.iter().position(|p| idx == p).unwrap();
647 result_row_vec.push(
648 result_row_in_key
649 .datum_at(item_position_in_pk_indices)
650 .to_owned_datum(),
651 );
652 }
653 }
654 let result_row = OwnedRow::new(result_row_vec);
655 Ok(Some(result_row))
656 }
657 None => match &self.epoch_idx {
658 Some(epoch_idx) => {
659 let mut result_row_vec = vec![];
660 for idx in &self.output_indices {
661 if idx == epoch_idx {
662 let epoch = Epoch::from(epoch);
663 result_row_vec.push(risingwave_common::types::Datum::from(
664 epoch.as_scalar(),
665 ));
666 } else {
667 let item_position_in_value_indices = &self
668 .value_output_indices
669 .iter()
670 .position(|p| idx == p)
671 .unwrap();
672 result_row_vec.push(
673 result_row_in_value
674 .datum_at(*item_position_in_value_indices)
675 .to_owned_datum(),
676 );
677 }
678 }
679 let result_row = OwnedRow::new(result_row_vec);
680 Ok(Some(result_row))
681 }
682 None => Ok(Some(result_row_in_value.into_owned_row())),
683 },
684 }
685 }
686 _ => Ok(None),
687 }
688 }
689
690 #[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
692 pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
693 self.distribution.update_vnode_bitmap(new_vnodes)
694 }
695}
696
697impl<S: Stream<Item = StorageResult<OwnedRow>> + Send + Unpin> TableIter for S {
700 async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
701 self.next().await.transpose()
702 }
703}
704
705mod merge_vnode_stream {
706
707 use bytes::Bytes;
708 use futures::{Stream, StreamExt, TryStreamExt};
709 use risingwave_hummock_sdk::key::TableKey;
710
711 use crate::error::StorageResult;
712 use crate::table::KeyedRow;
713 use crate::table::merge_sort::{NodePeek, merge_sort};
714
715 pub(super) enum VnodeStreamType<RowSt, KeyedRowSt> {
716 Single(RowSt),
717 Unordered(Vec<RowSt>),
718 Ordered(Vec<KeyedRowSt>),
719 }
720
721 pub(super) type MergedVnodeStream<
722 R: Send,
723 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
724 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
725 >
726 where
727 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
728 = impl Stream<Item = StorageResult<R>> + Send;
729
730 pub(super) type SortKeyType = Bytes; #[define_opaque(MergedVnodeStream)]
733 pub(super) fn merge_stream<
734 R: Send,
735 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
736 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
737 >(
738 stream: VnodeStreamType<RowSt, KeyedRowSt>,
739 ) -> MergedVnodeStream<R, RowSt, KeyedRowSt>
740 where
741 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
742 {
743 #[auto_enums::auto_enum(futures03::Stream)]
744 match stream {
745 VnodeStreamType::Single(stream) => stream.map_ok(|(_, row)| row),
746 VnodeStreamType::Unordered(streams) => futures::stream::iter(
747 streams
748 .into_iter()
749 .map(|stream| Box::pin(stream.map_ok(|(_, row)| row))),
750 )
751 .flatten_unordered(1024),
752 VnodeStreamType::Ordered(streams) => merge_sort(streams.into_iter().map(|stream| {
753 Box::pin(stream.map_ok(|(key, row)| KeyedRow {
754 vnode_prefixed_key: TableKey(key),
755 row,
756 }))
757 }))
758 .map_ok(|keyed_row| keyed_row.row),
759 }
760 }
761}
762
763use merge_vnode_stream::*;
764
765async fn build_vnode_stream<
766 R: Send,
767 RowSt: Stream<Item = StorageResult<((), R)>> + Send,
768 KeyedRowSt: Stream<Item = StorageResult<(SortKeyType, R)>> + Send,
769 RowStFut: Future<Output = StorageResult<RowSt>>,
770 KeyedRowStFut: Future<Output = StorageResult<KeyedRowSt>>,
771>(
772 row_stream_fn: impl Fn(VirtualNode) -> RowStFut,
773 keyed_row_stream_fn: impl Fn(VirtualNode) -> KeyedRowStFut,
774 vnodes: &[VirtualNode],
775 ordered: bool,
776) -> StorageResult<MergedVnodeStream<R, RowSt, KeyedRowSt>>
777where
778 KeyedRow<SortKeyType, R>: NodePeek + Send + Sync,
779{
780 let stream = match vnodes {
781 [] => unreachable!(),
782 [vnode] => VnodeStreamType::Single(row_stream_fn(*vnode).await?),
783 vnodes if !ordered => VnodeStreamType::Unordered(
785 try_join_all(vnodes.iter().map(|vnode| row_stream_fn(*vnode))).await?,
786 ),
787 vnodes => VnodeStreamType::Ordered(
789 try_join_all(vnodes.iter().map(|vnode| keyed_row_stream_fn(*vnode))).await?,
790 ),
791 };
792 Ok(merge_stream(stream))
793}
794
795impl<S: StateStore, SD: ValueRowSerde> BatchTableInner<S, SD> {
797 async fn iter_with_encoded_key_range(
800 &self,
801 prefix_hint: Option<Bytes>,
802 (start_bound, end_bound): (Bound<Bytes>, Bound<Bytes>),
803 wait_epoch: HummockReadEpoch,
804 vnode_hint: Option<VirtualNode>,
805 ordered: bool,
806 prefetch_options: PrefetchOptions,
807 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
808 {
809 let vnodes = match vnode_hint {
810 Some(vnode) => {
812 assert!(
813 self.distribution.vnodes().is_set(vnode.to_index()),
814 "vnode unset: {:?}, distribution: {:?}",
815 vnode,
816 self.distribution
817 );
818 vec![vnode]
819 }
820 None => self.distribution.vnodes().iter_vnodes().collect_vec(),
822 };
823
824 let read_snapshot = self
825 .store
826 .new_read_snapshot(
827 wait_epoch,
828 NewReadSnapshotOptions {
829 table_id: self.table_id,
830 table_option: self.table_option,
831 },
832 )
833 .await?;
834
835 build_vnode_stream(
836 |vnode| {
837 self.iter_vnode_with_encoded_key_range(
838 &read_snapshot,
839 prefix_hint.clone(),
840 (start_bound.as_ref(), end_bound.as_ref()),
841 vnode,
842 prefetch_options,
843 )
844 },
845 |vnode| {
846 self.iter_vnode_with_encoded_key_range(
847 &read_snapshot,
848 prefix_hint.clone(),
849 (start_bound.as_ref(), end_bound.as_ref()),
850 vnode,
851 prefetch_options,
852 )
853 },
854 &vnodes,
855 ordered,
856 )
857 .await
858 }
859
860 async fn iter_vnode_with_encoded_key_range<K: CopyFromSlice>(
861 &self,
862 read_snapshot: &S::ReadSnapshot,
863 prefix_hint: Option<Bytes>,
864 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
865 vnode: VirtualNode,
866 prefetch_options: PrefetchOptions,
867 ) -> StorageResult<impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, S, SD>>
868 {
869 let (table_key_range, read_options, pk_serializer) =
870 self.vnode_read_context(prefix_hint, encoded_key_range, vnode, prefetch_options);
871
872 let iter = read_snapshot.iter(table_key_range, read_options).await?;
873 Ok(self.iter_stream_from_state_store_iter::<K, _>(iter, pk_serializer))
874 }
875
876 fn vnode_read_context(
877 &self,
878 prefix_hint: Option<Bytes>,
879 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
880 vnode: VirtualNode,
881 prefetch_options: PrefetchOptions,
882 ) -> (TableKeyRange, ReadOptions, Option<Arc<OrderedRowSerde>>) {
883 let cache_policy = match &encoded_key_range {
884 (Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(Hint::Low),
887 _ => CachePolicy::Fill(Hint::Normal),
888 };
889
890 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
891 {
892 {
893 let read_options = ReadOptions {
894 prefix_hint,
895 prefetch_options,
896 cache_policy,
897 };
898 let pk_serializer = match self.output_row_in_key_indices.is_empty() {
899 true => None,
900 false => Some(Arc::new(self.pk_serializer.clone())),
901 };
902
903 (table_key_range, read_options, pk_serializer)
904 }
905 }
906 }
907
908 fn iter_stream_from_state_store_iter<K: CopyFromSlice, SI: StateStoreIter + Send>(
909 &self,
910 iter: SI,
911 pk_serializer: Option<Arc<OrderedRowSerde>>,
912 ) -> impl Stream<Item = StorageResult<(K, OwnedRow)>> + Send + use<K, SI, S, SD> {
913 BatchTableInnerIterInner {
914 iter,
915 mapping: self.mapping.clone(),
916 epoch_idx: self.epoch_idx,
917 row_deserializer: self.row_serde.clone(),
918 pk_serializer,
919 output_indices: self.output_indices.clone(),
920 key_output_indices: self.key_output_indices.clone(),
921 value_output_indices: self.value_output_indices.clone(),
922 output_row_in_key_indices: self.output_row_in_key_indices.clone(),
923 }
924 .into_stream::<K>()
925 }
926
927 fn serialize_pk_bound(
929 &self,
930 pk_prefix: impl Row,
931 range_bound: Bound<&OwnedRow>,
932 is_start_bound: bool,
933 ) -> Bound<Bytes> {
934 match range_bound {
935 Included(k) => {
936 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
937 let key = pk_prefix.chain(k);
938 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
939 if is_start_bound {
940 Included(serialized_key)
941 } else {
942 end_bound_of_prefix(&serialized_key)
945 }
946 }
947 Excluded(k) => {
948 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
949 let key = pk_prefix.chain(k);
950 let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
951 if is_start_bound {
952 let next_serialized_key = next_key(&serialized_key);
957 assert!(!next_serialized_key.is_empty());
958 Included(Bytes::from(next_serialized_key))
959 } else {
960 Excluded(serialized_key)
961 }
962 }
963 Unbounded => {
964 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
965 let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
966 if pk_prefix.is_empty() {
967 Unbounded
968 } else if is_start_bound {
969 Included(serialized_pk_prefix)
970 } else {
971 end_bound_of_prefix(&serialized_pk_prefix)
972 }
973 }
974 }
975 }
976
977 async fn iter_with_pk_bounds(
979 &self,
980 epoch: HummockReadEpoch,
981 pk_prefix: impl Row,
982 range_bounds: impl RangeBounds<OwnedRow>,
983 ordered: bool,
984 prefetch_options: PrefetchOptions,
985 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
986 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
987 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
988 assert!(pk_prefix.len() <= self.pk_indices.len());
989 let pk_prefix_indices = (0..pk_prefix.len())
990 .map(|index| self.pk_indices[index])
991 .collect_vec();
992
993 let prefix_hint = if should_calculate_prefix_hint(
994 self.read_prefix_len_hint,
995 pk_prefix.len(),
996 true,
997 ) {
998 let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
999 start_key
1000 } else {
1001 unreachable!()
1002 };
1003 let prefix_len = self
1004 .pk_serializer
1005 .deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
1006 Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
1007 } else {
1008 trace!(
1009 "iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
1010 self.table_id, pk_prefix, pk_prefix_indices
1011 );
1012 None
1013 };
1014
1015 trace!(
1016 "iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}",
1017 self.table_id, prefix_hint, start_key, end_key, pk_prefix, pk_prefix_indices
1018 );
1019
1020 self.iter_with_encoded_key_range(
1021 prefix_hint,
1022 (start_key, end_key),
1023 epoch,
1024 self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
1025 ordered,
1026 prefetch_options,
1027 )
1028 .await
1029 }
1030
1031 #[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
1033 async fn convert_row_stream_to_array_vec_stream(
1034 iter: impl Stream<Item = StorageResult<OwnedRow>>,
1035 schema: Schema,
1036 chunk_size: usize,
1037 ) {
1038 use futures::{TryStreamExt, pin_mut};
1039 use risingwave_common::util::iter_util::ZipEqFast;
1040
1041 pin_mut!(iter);
1042
1043 let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
1044 let mut row_count = 0;
1045
1046 while let Some(row) = iter.try_next().await? {
1047 row_count += 1;
1048 let builders_ref =
1050 builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
1051 for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
1052 builder.append(datum);
1053 }
1054 if row_count == chunk_size {
1055 let columns: Vec<_> = builders
1056 .take()
1057 .unwrap()
1058 .into_iter()
1059 .map(|builder| builder.finish().into())
1060 .collect();
1061 yield (columns, row_count);
1062 assert!(builders.is_none());
1063 row_count = 0;
1064 }
1065 }
1066
1067 if let Some(builders) = builders {
1068 assert_gt!(row_count, 0);
1069 let columns: Vec<_> = builders
1071 .into_iter()
1072 .map(|builder| builder.finish().into())
1073 .collect();
1074 yield (columns, row_count);
1075 }
1076 }
1077
1078 async fn chunk_iter_with_pk_bounds(
1081 &self,
1082 epoch: HummockReadEpoch,
1083 pk_prefix: impl Row,
1084 range_bounds: impl RangeBounds<OwnedRow>,
1085 ordered: bool,
1086 chunk_size: usize,
1087 prefetch_options: PrefetchOptions,
1088 ) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
1089 let iter = self
1090 .iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
1091 .await?;
1092
1093 Ok(Self::convert_row_stream_to_array_vec_stream(
1094 iter,
1095 self.schema.clone(),
1096 chunk_size,
1097 ))
1098 }
1099
1100 pub async fn batch_iter_with_pk_bounds(
1103 &self,
1104 epoch: HummockReadEpoch,
1105 pk_prefix: impl Row,
1106 range_bounds: impl RangeBounds<OwnedRow>,
1107 ordered: bool,
1108 prefetch_options: PrefetchOptions,
1109 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
1110 self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
1111 .await
1112 }
1113
1114 pub async fn batch_iter(
1116 &self,
1117 epoch: HummockReadEpoch,
1118 ordered: bool,
1119 prefetch_options: PrefetchOptions,
1120 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send> {
1121 self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
1122 .await
1123 }
1124
1125 fn start_bound_from_pk(&self, start_pk: Option<&OwnedRow>) -> Bound<Bytes> {
1126 if let Some(start_pk) = start_pk {
1127 let mut bytes = BytesMut::new();
1128 self.pk_serializer.serialize(start_pk, &mut bytes);
1129 let bytes = bytes.freeze();
1130 Included(bytes)
1131 } else {
1132 Unbounded
1133 }
1134 }
1135
1136 pub async fn batch_iter_vnode(
1137 &self,
1138 epoch: HummockReadEpoch,
1139 start_pk: Option<&OwnedRow>,
1140 vnode: VirtualNode,
1141 prefetch_options: PrefetchOptions,
1142 rebuild_interval: Duration,
1143 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
1144 {
1145 let full_pk_prefix = OwnedRow::default();
1146 let full_range = (Bound::Unbounded, Bound::Unbounded);
1147 self.batch_iter_vnode_with_pk_range(
1148 epoch,
1149 start_pk,
1150 &full_pk_prefix,
1151 &full_range,
1152 vnode,
1153 prefetch_options,
1154 rebuild_interval,
1155 )
1156 .await
1157 }
1158
1159 pub async fn batch_iter_vnode_with_pk_range(
1162 &self,
1163 epoch: HummockReadEpoch,
1164 start_pk: Option<&OwnedRow>,
1165 pk_prefix: &OwnedRow,
1166 range_bounds: &(Bound<OwnedRow>, Bound<OwnedRow>),
1167 vnode: VirtualNode,
1168 prefetch_options: PrefetchOptions,
1169 rebuild_interval: Duration,
1170 ) -> StorageResult<impl Stream<Item = StorageResult<OwnedRow>> + Send + 'static + use<S, SD>>
1171 {
1172 assert!(
1173 !rebuild_interval.is_zero(),
1174 "rebuild_interval should be positive"
1175 );
1176
1177 let normalized_range_bounds = PkScanRange {
1178 pk_prefix: pk_prefix.clone(),
1179 range_bounds: range_bounds.clone(),
1180 }
1181 .convert_to_range_bounds(self);
1182
1183 let start_key = if let Some(start_pk) = start_pk {
1184 self.start_bound_from_pk(Some(start_pk))
1185 } else {
1186 self.serialize_pk_bound(pk_prefix, normalized_range_bounds.start_bound(), true)
1187 };
1188 let end_key =
1189 self.serialize_pk_bound(pk_prefix, normalized_range_bounds.end_bound(), false);
1190
1191 let prefix_hint =
1192 if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint <= pk_prefix.len() {
1193 let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
1194 let serialized_pk_prefix = serialize_pk(pk_prefix, &pk_prefix_serializer);
1195 let prefix_len = self
1196 .pk_serializer
1197 .deserialize_prefix_len(&serialized_pk_prefix, self.read_prefix_len_hint)?;
1198 Some(Bytes::from(serialized_pk_prefix[..prefix_len].to_vec()))
1199 } else {
1200 None
1201 };
1202
1203 let snapshot = Arc::new(
1204 self.store
1205 .new_read_snapshot(
1206 epoch,
1207 NewReadSnapshotOptions {
1208 table_id: self.table_id,
1209 table_option: self.table_option,
1210 },
1211 )
1212 .await?,
1213 );
1214 let (table_key_range, read_options, pk_serializer) = self.vnode_read_context(
1215 prefix_hint,
1216 (start_key.as_ref(), end_key.as_ref()),
1217 vnode,
1218 prefetch_options,
1219 );
1220 let iter = iter_with_timeout_rebuild(
1221 snapshot,
1222 table_key_range,
1223 self.table_id,
1224 read_options,
1225 rebuild_interval,
1226 )
1227 .await?;
1228 let iter = self.iter_stream_from_state_store_iter::<(), _>(iter, pk_serializer);
1229 Ok(iter.map_ok(|(_, row)| row))
1230 }
1231
1232 pub async fn next_epoch(&self, epoch: u64) -> StorageResult<u64> {
1233 self.store
1234 .next_epoch(
1235 epoch,
1236 NextEpochOptions {
1237 table_id: self.table_id,
1238 },
1239 )
1240 .await
1241 }
1242
1243 pub async fn batch_iter_vnode_log(
1244 &self,
1245 start_epoch: u64,
1246 end_epoch: HummockReadEpoch,
1247 start_pk: Option<&OwnedRow>,
1248 vnode: VirtualNode,
1249 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static + use<S, SD>>
1250 {
1251 let start_bound = self.start_bound_from_pk(start_pk);
1252 let stream = self
1253 .batch_iter_log_inner::<()>(
1254 start_epoch,
1255 end_epoch,
1256 (start_bound.as_ref(), Unbounded),
1257 vnode,
1258 )
1259 .await?;
1260 Ok(stream.map_ok(|(_, row)| row))
1261 }
1262
1263 pub async fn batch_iter_log_with_pk_bounds(
1264 &self,
1265 start_epoch: u64,
1266 end_epoch: HummockReadEpoch,
1267 ordered: bool,
1268 range_bounds: impl RangeBounds<OwnedRow>,
1269 pk_prefix: impl Row,
1270 ) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send> {
1271 let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
1272 let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
1273 let vnodes = self.distribution.vnodes().iter_vnodes().collect_vec();
1274 build_vnode_stream(
1275 |vnode| {
1276 self.batch_iter_log_inner(
1277 start_epoch,
1278 end_epoch,
1279 (start_key.as_ref(), end_key.as_ref()),
1280 vnode,
1281 )
1282 },
1283 |vnode| {
1284 self.batch_iter_log_inner(
1285 start_epoch,
1286 end_epoch,
1287 (start_key.as_ref(), end_key.as_ref()),
1288 vnode,
1289 )
1290 },
1291 &vnodes,
1292 ordered,
1293 )
1294 .await
1295 }
1296
1297 async fn batch_iter_log_inner<K: CopyFromSlice>(
1298 &self,
1299 start_epoch: u64,
1300 end_epoch: HummockReadEpoch,
1301 encoded_key_range: (Bound<&Bytes>, Bound<&Bytes>),
1302 vnode: VirtualNode,
1303 ) -> StorageResult<impl Stream<Item = StorageResult<(K, ChangeLogRow)>> + Send + use<K, S, SD>>
1304 {
1305 let table_key_range = prefixed_range_with_vnode::<&Bytes>(encoded_key_range, vnode);
1306 let read_options = ReadLogOptions {
1307 table_id: self.table_id,
1308 };
1309 let iter = BatchTableInnerIterLogInner::<S, SD>::new(
1310 &self.store,
1311 self.mapping.clone(),
1312 self.row_serde.clone(),
1313 table_key_range,
1314 read_options,
1315 start_epoch,
1316 end_epoch,
1317 )
1318 .await?
1319 .into_stream::<K>();
1320
1321 Ok(iter)
1322 }
1323
1324 pub async fn batch_chunk_iter_with_pk_bounds(
1327 &self,
1328 epoch: HummockReadEpoch,
1329 pk_prefix: impl Row,
1330 range_bounds: impl RangeBounds<OwnedRow>,
1331 ordered: bool,
1332 chunk_size: usize,
1333 prefetch_options: PrefetchOptions,
1334 ) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
1335 let iter = self
1336 .chunk_iter_with_pk_bounds(
1337 epoch,
1338 pk_prefix,
1339 range_bounds,
1340 ordered,
1341 chunk_size,
1342 prefetch_options,
1343 )
1344 .await?;
1345
1346 Ok(iter.map(|item| {
1347 let (columns, row_count) = item?;
1348 Ok(DataChunk::new(columns, row_count))
1349 }))
1350 }
1351}
1352
1353struct BatchTableInnerIterInner<SI: StateStoreIter, SD: ValueRowSerde> {
1355 iter: SI,
1357
1358 mapping: Arc<ColumnMapping>,
1359
1360 epoch_idx: Option<usize>,
1362
1363 row_deserializer: Arc<SD>,
1364
1365 pk_serializer: Option<Arc<OrderedRowSerde>>,
1367
1368 output_indices: Vec<usize>,
1369
1370 key_output_indices: Option<Vec<usize>>,
1372
1373 value_output_indices: Vec<usize>,
1375
1376 output_row_in_key_indices: Vec<usize>,
1378}
1379
1380impl<SI: StateStoreIter, SD: ValueRowSerde> BatchTableInnerIterInner<SI, SD> {
1381 #[try_stream(ok = (K, OwnedRow), error = StorageError)]
1383 async fn into_stream<K: CopyFromSlice>(mut self) {
1384 while let Some((k, v)) = self
1385 .iter
1386 .try_next()
1387 .instrument_await("storage_table_iter_next".verbose())
1388 .await?
1389 {
1390 let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
1391 let row = self.row_deserializer.deserialize(value)?;
1392 let result_row_in_value = self.mapping.project(OwnedRow::new(row));
1393 let row = match &self.key_output_indices {
1394 Some(key_output_indices) => {
1395 let result_row_in_key = match self.pk_serializer.clone() {
1396 Some(pk_serializer) => {
1397 let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
1398
1399 pk.project(&self.output_row_in_key_indices).into_owned_row()
1400 }
1401 None => OwnedRow::empty(),
1402 };
1403
1404 let mut result_row_vec = vec![];
1405 for idx in &self.output_indices {
1406 if let Some(epoch_idx) = self.epoch_idx
1407 && *idx == epoch_idx
1408 {
1409 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1410 result_row_vec
1411 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1412 } else if self.value_output_indices.contains(idx) {
1413 let item_position_in_value_indices = &self
1414 .value_output_indices
1415 .iter()
1416 .position(|p| idx == p)
1417 .unwrap();
1418 result_row_vec.push(
1419 result_row_in_value
1420 .datum_at(*item_position_in_value_indices)
1421 .to_owned_datum(),
1422 );
1423 } else {
1424 let item_position_in_pk_indices =
1425 key_output_indices.iter().position(|p| idx == p).unwrap();
1426 result_row_vec.push(
1427 result_row_in_key
1428 .datum_at(item_position_in_pk_indices)
1429 .to_owned_datum(),
1430 );
1431 }
1432 }
1433 OwnedRow::new(result_row_vec)
1434 }
1435 None => match &self.epoch_idx {
1436 Some(epoch_idx) => {
1437 let mut result_row_vec = vec![];
1438 for idx in &self.output_indices {
1439 if idx == epoch_idx {
1440 let epoch = Epoch::from(epoch_with_gap.pure_epoch());
1441 result_row_vec
1442 .push(risingwave_common::types::Datum::from(epoch.as_scalar()));
1443 } else {
1444 let item_position_in_value_indices = &self
1445 .value_output_indices
1446 .iter()
1447 .position(|p| idx == p)
1448 .unwrap();
1449 result_row_vec.push(
1450 result_row_in_value
1451 .datum_at(*item_position_in_value_indices)
1452 .to_owned_datum(),
1453 );
1454 }
1455 }
1456 OwnedRow::new(result_row_vec)
1457 }
1458 None => result_row_in_value.into_owned_row(),
1459 },
1460 };
1461 yield (K::copy_from_slice(table_key.as_ref()), row);
1462 }
1463 }
1464}
1465
1466struct BatchTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
1468 iter: S::ChangeLogIter,
1470
1471 mapping: Arc<ColumnMapping>,
1472
1473 row_deserializer: Arc<SD>,
1474}
1475
1476impl<S: StateStore, SD: ValueRowSerde> BatchTableInnerIterLogInner<S, SD> {
1477 #[allow(clippy::too_many_arguments)]
1479 async fn new(
1480 store: &S,
1481 mapping: Arc<ColumnMapping>,
1482 row_deserializer: Arc<SD>,
1483 table_key_range: TableKeyRange,
1484 read_options: ReadLogOptions,
1485 start_epoch: u64,
1486 end_epoch: HummockReadEpoch,
1487 ) -> StorageResult<Self> {
1488 store
1489 .try_wait_epoch(
1490 end_epoch,
1491 TryWaitEpochOptions {
1492 table_id: read_options.table_id,
1493 },
1494 )
1495 .await?;
1496 let iter = store
1497 .iter_log(
1498 (start_epoch, end_epoch.get_epoch()),
1499 table_key_range,
1500 read_options,
1501 )
1502 .await?;
1503 let iter = Self {
1504 iter,
1505 mapping,
1506 row_deserializer,
1507 };
1508 Ok(iter)
1509 }
1510
1511 fn into_stream<K: CopyFromSlice>(self) -> impl Stream<Item = StorageResult<(K, ChangeLogRow)>> {
1513 self.iter.into_stream(move |(table_key, value)| {
1514 value
1515 .try_map(|value| {
1516 let full_row = self.row_deserializer.deserialize(value)?;
1517 let row = self
1518 .mapping
1519 .project(OwnedRow::new(full_row))
1520 .into_owned_row();
1521 Ok(row)
1522 })
1523 .map(|row| (K::copy_from_slice(table_key.as_ref()), row))
1524 })
1525 }
1526}