use std::ops::Bound::{self, Excluded, Included, Unbounded};
use std::ops::RangeBounds;
use std::sync::Arc;
use auto_enums::auto_enum;
use await_tree::InstrumentAwait;
use bytes::Bytes;
use foyer::CacheContext;
use futures::future::try_join_all;
use futures::{Stream, StreamExt};
use futures_async_stream::try_stream;
use itertools::{Either, Itertools};
use more_asserts::assert_gt;
use risingwave_common::array::{ArrayBuilderImpl, ArrayRef, DataChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, TableId, TableOption};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt};
use risingwave_common::row::{self, OwnedRow, Row, RowExt};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::util::epoch::Epoch;
use risingwave_common::util::row_serde::*;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::column_aware_row_encoding::ColumnAwareSerde;
use risingwave_common::util::value_encoding::{BasicSerde, EitherSerde};
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, next_key, prefixed_range_with_vnode, TableKeyRange,
};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use tracing::trace;
use crate::error::{StorageError, StorageResult};
use crate::hummock::CachePolicy;
use crate::row_serde::row_serde_util::{serialize_pk, serialize_pk_with_vnode};
use crate::row_serde::value_serde::{ValueRowSerde, ValueRowSerdeNew};
use crate::row_serde::{find_columns_by_ids, ColumnMapping};
use crate::store::{
PrefetchOptions, ReadLogOptions, ReadOptions, StateStoreIter, StateStoreIterExt,
TryWaitEpochOptions,
};
use crate::table::merge_sort::merge_sort;
use crate::table::{ChangeLogRow, KeyedChangeLogRow, KeyedRow, TableDistribution, TableIter};
use crate::StateStore;
#[derive(Clone)]
pub struct StorageTableInner<S: StateStore, SD: ValueRowSerde> {
table_id: TableId,
store: S,
schema: Schema,
pk_serializer: OrderedRowSerde,
output_indices: Vec<usize>,
key_output_indices: Option<Vec<usize>>,
value_output_indices: Vec<usize>,
output_row_in_key_indices: Vec<usize>,
mapping: Arc<ColumnMapping>,
epoch_idx: Option<usize>,
row_serde: Arc<SD>,
pk_indices: Vec<usize>,
distribution: TableDistribution,
table_option: TableOption,
read_prefix_len_hint: usize,
}
pub type StorageTable<S> = StorageTableInner<S, EitherSerde>;
impl<S: StateStore, SD: ValueRowSerde> std::fmt::Debug for StorageTableInner<S, SD> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StorageTableInner").finish_non_exhaustive()
}
}
impl<S: StateStore> StorageTableInner<S, EitherSerde> {
pub fn new_partial(
store: S,
output_column_ids: Vec<ColumnId>,
vnodes: Option<Arc<Bitmap>>,
table_desc: &StorageTableDesc,
) -> Self {
let table_id = TableId {
table_id: table_desc.table_id,
};
let column_descs = table_desc
.columns
.iter()
.map(ColumnDesc::from)
.collect_vec();
let order_types: Vec<OrderType> = table_desc
.pk
.iter()
.map(|order| OrderType::from_protobuf(order.get_order_type().unwrap()))
.collect();
let pk_indices = table_desc
.pk
.iter()
.map(|k| k.column_index as usize)
.collect_vec();
let table_option = TableOption {
retention_seconds: table_desc.retention_seconds,
};
let value_indices = table_desc
.get_value_indices()
.iter()
.map(|&k| k as usize)
.collect_vec();
let prefix_hint_len = table_desc.get_read_prefix_len_hint() as usize;
let versioned = table_desc.versioned;
let distribution = TableDistribution::new_from_storage_table_desc(vnodes, table_desc);
Self::new_inner(
store,
table_id,
column_descs,
output_column_ids,
order_types,
pk_indices,
distribution,
table_option,
value_indices,
prefix_hint_len,
versioned,
)
}
pub fn for_test_with_partial_columns(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
output_column_ids: Vec<ColumnId>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
value_indices: Vec<usize>,
) -> Self {
Self::new_inner(
store,
table_id,
columns,
output_column_ids,
order_types,
pk_indices,
TableDistribution::singleton(),
Default::default(),
value_indices,
0,
false,
)
}
pub fn for_test(
store: S,
table_id: TableId,
columns: Vec<ColumnDesc>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
value_indices: Vec<usize>,
) -> Self {
let output_column_ids = columns.iter().map(|c| c.column_id).collect();
Self::for_test_with_partial_columns(
store,
table_id,
columns,
output_column_ids,
order_types,
pk_indices,
value_indices,
)
}
#[allow(clippy::too_many_arguments)]
fn new_inner(
store: S,
table_id: TableId,
table_columns: Vec<ColumnDesc>,
output_column_ids: Vec<ColumnId>,
order_types: Vec<OrderType>,
pk_indices: Vec<usize>,
distribution: TableDistribution,
table_option: TableOption,
value_indices: Vec<usize>,
read_prefix_len_hint: usize,
versioned: bool,
) -> Self {
assert_eq!(order_types.len(), pk_indices.len());
let (output_columns, output_indices) =
find_columns_by_ids(&table_columns, &output_column_ids);
let mut value_output_indices = vec![];
let mut key_output_indices = vec![];
let mut epoch_idx = None;
for idx in &output_indices {
if value_indices.contains(idx) {
value_output_indices.push(*idx);
} else if pk_indices.contains(idx) {
key_output_indices.push(*idx);
} else {
assert!(epoch_idx.is_none());
epoch_idx = Some(*idx);
}
}
let output_row_in_key_indices = key_output_indices
.iter()
.map(|&di| pk_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let schema = Schema::new(output_columns.iter().map(Into::into).collect());
let pk_data_types = pk_indices
.iter()
.map(|i| table_columns[*i].data_type.clone())
.collect();
let pk_serializer = OrderedRowSerde::new(pk_data_types, order_types);
let (row_serde, mapping) = {
if versioned {
let value_output_indices_dedup = value_output_indices
.iter()
.unique()
.copied()
.collect::<Vec<_>>();
let output_row_in_value_output_indices_dedup = value_output_indices
.iter()
.map(|&di| {
value_output_indices_dedup
.iter()
.position(|&pi| di == pi)
.unwrap()
})
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_output_indices_dedup);
let serde =
ColumnAwareSerde::new(value_output_indices_dedup.into(), table_columns.into());
(serde.into(), mapping)
} else {
let output_row_in_value_indices = value_output_indices
.iter()
.map(|&di| value_indices.iter().position(|&pi| di == pi).unwrap())
.collect_vec();
let mapping = ColumnMapping::new(output_row_in_value_indices);
let serde = BasicSerde::new(value_indices.into(), table_columns.into());
(serde.into(), mapping)
}
};
let key_output_indices = match key_output_indices.is_empty() {
true => None,
false => Some(key_output_indices),
};
Self {
table_id,
store,
schema,
pk_serializer,
output_indices,
key_output_indices,
value_output_indices,
output_row_in_key_indices,
mapping: Arc::new(mapping),
epoch_idx,
row_serde: Arc::new(row_serde),
pk_indices,
distribution,
table_option,
read_prefix_len_hint,
}
}
}
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn pk_serializer(&self) -> &OrderedRowSerde {
&self.pk_serializer
}
pub fn schema(&self) -> &Schema {
&self.schema
}
pub fn pk_indices(&self) -> &[usize] {
&self.pk_indices
}
pub fn output_indices(&self) -> &[usize] {
&self.output_indices
}
pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
self.pk_indices
.iter()
.map(|&i| self.output_indices.iter().position(|&j| i == j))
.collect()
}
pub fn table_id(&self) -> TableId {
self.table_id
}
pub fn vnodes(&self) -> &Arc<Bitmap> {
self.distribution.vnodes()
}
}
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub async fn get_row(
&self,
pk: impl Row,
wait_epoch: HummockReadEpoch,
) -> StorageResult<Option<OwnedRow>> {
let epoch = wait_epoch.get_epoch();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
let read_committed = wait_epoch.is_read_committed();
self.store
.try_wait_epoch(
wait_epoch,
TryWaitEpochOptions {
table_id: self.table_id,
},
)
.await?;
let serialized_pk = serialize_pk_with_vnode(
&pk,
&self.pk_serializer,
self.distribution.compute_vnode_by_pk(&pk),
);
assert!(pk.len() <= self.pk_indices.len());
let prefix_hint = if self.read_prefix_len_hint != 0 && self.read_prefix_len_hint == pk.len()
{
Some(serialized_pk.slice(VirtualNode::SIZE..))
} else {
None
};
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
read_version_from_backup: read_backup,
read_committed,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
if let Some((full_key, value)) = self
.store
.get_keyed_row(serialized_pk, epoch, read_options)
.await?
{
let row = self.row_serde.deserialize(&value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key =
pk.project(&self.output_row_in_key_indices).into_owned_row();
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if let Some(epoch_idx) = self.epoch_idx
&& *idx == epoch_idx
{
let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else if self.value_output_indices.contains(idx) {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
.position(|p| idx == p)
.unwrap();
result_row_vec.push(
result_row_in_value
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => match &self.epoch_idx {
Some(epoch_idx) => {
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if idx == epoch_idx {
let epoch = Epoch::from(full_key.epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
.position(|p| idx == p)
.unwrap();
result_row_vec.push(
result_row_in_value
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
}
}
let result_row = OwnedRow::new(result_row_vec);
Ok(Some(result_row))
}
None => Ok(Some(result_row_in_value.into_owned_row())),
},
}
} else {
Ok(None)
}
}
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> Arc<Bitmap> {
self.distribution.update_vnode_bitmap(new_vnodes)
}
}
pub trait PkAndRowStream = Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send;
#[async_trait::async_trait]
impl<S: PkAndRowStream + Unpin> TableIter for S {
async fn next_row(&mut self) -> StorageResult<Option<OwnedRow>> {
self.next()
.await
.transpose()
.map(|r| r.map(|keyed_row| keyed_row.into_owned_row()))
}
}
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
async fn iter_with_encoded_key_range(
&self,
prefix_hint: Option<Bytes>,
encoded_key_range: (Bound<Bytes>, Bound<Bytes>),
wait_epoch: HummockReadEpoch,
vnode_hint: Option<VirtualNode>,
ordered: bool,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send> {
let cache_policy = match (
encoded_key_range.start_bound(),
encoded_key_range.end_bound(),
) {
(Unbounded, _) | (_, Unbounded) => CachePolicy::Fill(CacheContext::LowPriority),
_ => CachePolicy::Fill(CacheContext::Default),
};
let table_key_ranges = {
let vnodes = match vnode_hint {
Some(vnode) => Either::Left(std::iter::once(vnode)),
None => Either::Right(self.distribution.vnodes().iter_vnodes()),
};
vnodes.map(|vnode| prefixed_range_with_vnode(encoded_key_range.clone(), vnode))
};
let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| {
let prefix_hint = prefix_hint.clone();
let read_backup = matches!(wait_epoch, HummockReadEpoch::Backup(_));
let read_committed = wait_epoch.is_read_committed();
async move {
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
read_version_from_backup: read_backup,
read_committed,
prefetch_options,
cache_policy,
};
let pk_serializer = match self.output_row_in_key_indices.is_empty() {
true => None,
false => Some(Arc::new(self.pk_serializer.clone())),
};
let iter = StorageTableInnerIterInner::<S, SD>::new(
&self.store,
self.mapping.clone(),
self.epoch_idx,
pk_serializer,
self.output_indices.clone(),
self.key_output_indices.clone(),
self.value_output_indices.clone(),
self.output_row_in_key_indices.clone(),
self.row_serde.clone(),
table_key_range,
read_options,
wait_epoch,
)
.await?
.into_stream();
Ok::<_, StorageError>(iter)
}
}))
.await?;
#[auto_enum(futures03::Stream)]
let iter = match iterators.len() {
0 => unreachable!(),
1 => iterators.into_iter().next().unwrap(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
.flatten_unordered(1024)
}
_ => merge_sort(iterators.into_iter().map(Box::pin).collect()),
};
Ok(iter)
}
fn serialize_pk_bound(
&self,
pk_prefix: impl Row,
range_bound: Bound<&OwnedRow>,
is_start_bound: bool,
) -> Bound<Bytes> {
match range_bound {
Included(k) => {
let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
let key = pk_prefix.chain(k);
let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
if is_start_bound {
Included(serialized_key)
} else {
end_bound_of_prefix(&serialized_key)
}
}
Excluded(k) => {
let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len() + k.len());
let key = pk_prefix.chain(k);
let serialized_key = serialize_pk(&key, &pk_prefix_serializer);
if is_start_bound {
let next_serialized_key = next_key(&serialized_key);
assert!(!next_serialized_key.is_empty());
Included(Bytes::from(next_serialized_key))
} else {
Excluded(serialized_key)
}
}
Unbounded => {
let pk_prefix_serializer = self.pk_serializer.prefix(pk_prefix.len());
let serialized_pk_prefix = serialize_pk(&pk_prefix, &pk_prefix_serializer);
if pk_prefix.is_empty() {
Unbounded
} else if is_start_bound {
Included(serialized_pk_prefix)
} else {
end_bound_of_prefix(&serialized_pk_prefix)
}
}
}
}
async fn iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send> {
let start_key = self.serialize_pk_bound(&pk_prefix, range_bounds.start_bound(), true);
let end_key = self.serialize_pk_bound(&pk_prefix, range_bounds.end_bound(), false);
assert!(pk_prefix.len() <= self.pk_indices.len());
let pk_prefix_indices = (0..pk_prefix.len())
.map(|index| self.pk_indices[index])
.collect_vec();
let prefix_hint = if self.read_prefix_len_hint != 0
&& self.read_prefix_len_hint <= pk_prefix.len()
{
let encoded_prefix = if let Bound::Included(start_key) = start_key.as_ref() {
start_key
} else {
unreachable!()
};
let prefix_len = self
.pk_serializer
.deserialize_prefix_len(encoded_prefix, self.read_prefix_len_hint)?;
Some(Bytes::from(encoded_prefix[..prefix_len].to_vec()))
} else {
trace!(
"iter_with_pk_bounds dist_key_indices table_id {} not match prefix pk_prefix {:?} pk_prefix_indices {:?}",
self.table_id,
pk_prefix,
pk_prefix_indices
);
None
};
trace!(
"iter_with_pk_bounds table_id {} prefix_hint {:?} start_key: {:?}, end_key: {:?} pk_prefix {:?} pk_prefix_indices {:?}" ,
self.table_id,
prefix_hint,
start_key,
end_key,
pk_prefix,
pk_prefix_indices
);
self.iter_with_encoded_key_range(
prefix_hint,
(start_key, end_key),
epoch,
self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix),
ordered,
prefetch_options,
)
.await
}
#[try_stream(ok = (Vec<ArrayRef>, usize), error = StorageError)]
async fn convert_row_stream_to_array_vec_stream(
iter: impl Stream<Item = StorageResult<KeyedRow<Bytes>>>,
schema: Schema,
chunk_size: usize,
) {
use futures::{pin_mut, TryStreamExt};
use risingwave_common::util::iter_util::ZipEqFast;
pin_mut!(iter);
let mut builders: Option<Vec<ArrayBuilderImpl>> = None;
let mut row_count = 0;
while let Some(row) = iter.try_next().await? {
row_count += 1;
let builders_ref =
builders.get_or_insert_with(|| schema.create_array_builders(chunk_size));
for (datum, builder) in row.iter().zip_eq_fast(builders_ref.iter_mut()) {
builder.append(datum);
}
if row_count == chunk_size {
let columns: Vec<_> = builders
.take()
.unwrap()
.into_iter()
.map(|builder| builder.finish().into())
.collect();
yield (columns, row_count);
assert!(builders.is_none());
row_count = 0;
}
}
if let Some(builders) = builders {
assert_gt!(row_count, 0);
let columns: Vec<_> = builders
.into_iter()
.map(|builder| builder.finish().into())
.collect();
yield (columns, row_count);
}
}
async fn chunk_iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
chunk_size: usize,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<(Vec<ArrayRef>, usize)>> + Send> {
let iter = self
.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
.await?;
Ok(Self::convert_row_stream_to_array_vec_stream(
iter,
self.schema.clone(),
chunk_size,
))
}
pub async fn batch_iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send> {
self.iter_with_pk_bounds(epoch, pk_prefix, range_bounds, ordered, prefetch_options)
.await
}
pub async fn batch_iter(
&self,
epoch: HummockReadEpoch,
ordered: bool,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Send> {
self.batch_iter_with_pk_bounds(epoch, row::empty(), .., ordered, prefetch_options)
.await
}
pub async fn batch_iter_log_with_pk_bounds(
&self,
start_epoch: u64,
end_epoch: HummockReadEpoch,
ordered: bool,
) -> StorageResult<impl Stream<Item = StorageResult<ChangeLogRow>> + Send + 'static> {
let pk_prefix = OwnedRow::default();
let start_key = self.serialize_pk_bound(&pk_prefix, Unbounded, true);
let end_key = self.serialize_pk_bound(&pk_prefix, Unbounded, false);
assert!(pk_prefix.len() <= self.pk_indices.len());
let table_key_ranges = {
let vnodes = match self.distribution.try_compute_vnode_by_pk_prefix(pk_prefix) {
Some(vnode) => Either::Left(std::iter::once(vnode)),
None => Either::Right(self.distribution.vnodes().iter_vnodes()),
};
vnodes
.map(|vnode| prefixed_range_with_vnode((start_key.clone(), end_key.clone()), vnode))
};
let iterators: Vec<_> = try_join_all(table_key_ranges.map(|table_key_range| async move {
let read_options = ReadLogOptions {
table_id: self.table_id,
};
let iter = StorageTableInnerIterLogInner::<S, SD>::new(
&self.store,
self.mapping.clone(),
self.row_serde.clone(),
table_key_range,
read_options,
start_epoch,
end_epoch,
)
.await?
.into_stream();
Ok::<_, StorageError>(iter)
}))
.await?;
#[auto_enum(futures03::Stream)]
let iter = match iterators.len() {
0 => unreachable!(),
1 => iterators.into_iter().next().unwrap(),
_ if !ordered => {
futures::stream::iter(iterators.into_iter().map(Box::pin).collect_vec())
.flatten_unordered(1024)
}
_ => merge_sort(iterators.into_iter().map(Box::pin).collect()),
}
.map(|row| row.map(|key_row| key_row.into_owned_row()));
Ok(iter)
}
pub async fn batch_chunk_iter_with_pk_bounds(
&self,
epoch: HummockReadEpoch,
pk_prefix: impl Row,
range_bounds: impl RangeBounds<OwnedRow>,
ordered: bool,
chunk_size: usize,
prefetch_options: PrefetchOptions,
) -> StorageResult<impl Stream<Item = StorageResult<DataChunk>> + Send> {
let iter = self
.chunk_iter_with_pk_bounds(
epoch,
pk_prefix,
range_bounds,
ordered,
chunk_size,
prefetch_options,
)
.await?;
Ok(iter.map(|item| {
let (columns, row_count) = item?;
Ok(DataChunk::new(columns, row_count))
}))
}
}
struct StorageTableInnerIterInner<S: StateStore, SD: ValueRowSerde> {
iter: S::Iter,
mapping: Arc<ColumnMapping>,
epoch_idx: Option<usize>,
row_deserializer: Arc<SD>,
pk_serializer: Option<Arc<OrderedRowSerde>>,
output_indices: Vec<usize>,
key_output_indices: Option<Vec<usize>>,
value_output_indices: Vec<usize>,
output_row_in_key_indices: Vec<usize>,
}
impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterInner<S, SD> {
#[allow(clippy::too_many_arguments)]
async fn new(
store: &S,
mapping: Arc<ColumnMapping>,
epoch_idx: Option<usize>,
pk_serializer: Option<Arc<OrderedRowSerde>>,
output_indices: Vec<usize>,
key_output_indices: Option<Vec<usize>>,
value_output_indices: Vec<usize>,
output_row_in_key_indices: Vec<usize>,
row_deserializer: Arc<SD>,
table_key_range: TableKeyRange,
read_options: ReadOptions,
epoch: HummockReadEpoch,
) -> StorageResult<Self> {
let raw_epoch = epoch.get_epoch();
store
.try_wait_epoch(
epoch,
TryWaitEpochOptions {
table_id: read_options.table_id,
},
)
.await?;
let iter = store.iter(table_key_range, raw_epoch, read_options).await?;
let iter = Self {
iter,
mapping,
epoch_idx,
row_deserializer,
pk_serializer,
output_indices,
key_output_indices,
value_output_indices,
output_row_in_key_indices,
};
Ok(iter)
}
#[try_stream(ok = KeyedRow<Bytes>, error = StorageError)]
async fn into_stream(mut self) {
while let Some((k, v)) = self
.iter
.try_next()
.verbose_instrument_await("storage_table_iter_next")
.await?
{
let (table_key, value, epoch_with_gap) = (k.user_key.table_key, v, k.epoch_with_gap);
let row = self.row_deserializer.deserialize(value)?;
let result_row_in_value = self.mapping.project(OwnedRow::new(row));
match &self.key_output_indices {
Some(key_output_indices) => {
let result_row_in_key = match self.pk_serializer.clone() {
Some(pk_serializer) => {
let pk = pk_serializer.deserialize(table_key.key_part().as_ref())?;
pk.project(&self.output_row_in_key_indices).into_owned_row()
}
None => OwnedRow::empty(),
};
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if let Some(epoch_idx) = self.epoch_idx
&& *idx == epoch_idx
{
let epoch = Epoch::from(epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else if self.value_output_indices.contains(idx) {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
.position(|p| idx == p)
.unwrap();
result_row_vec.push(
result_row_in_value
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
} else {
let item_position_in_pk_indices =
key_output_indices.iter().position(|p| idx == p).unwrap();
result_row_vec.push(
result_row_in_key
.datum_at(item_position_in_pk_indices)
.to_owned_datum(),
);
}
}
let row = OwnedRow::new(result_row_vec);
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row,
}
}
None => match &self.epoch_idx {
Some(epoch_idx) => {
let mut result_row_vec = vec![];
for idx in &self.output_indices {
if idx == epoch_idx {
let epoch = Epoch::from(epoch_with_gap.pure_epoch());
result_row_vec
.push(risingwave_common::types::Datum::from(epoch.as_scalar()));
} else {
let item_position_in_value_indices = &self
.value_output_indices
.iter()
.position(|p| idx == p)
.unwrap();
result_row_vec.push(
result_row_in_value
.datum_at(*item_position_in_value_indices)
.to_owned_datum(),
);
}
}
let row = OwnedRow::new(result_row_vec);
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row,
}
}
None => {
yield KeyedRow {
vnode_prefixed_key: table_key.copy_into(),
row: result_row_in_value.into_owned_row(),
}
}
},
}
}
}
}
struct StorageTableInnerIterLogInner<S: StateStore, SD: ValueRowSerde> {
iter: S::ChangeLogIter,
mapping: Arc<ColumnMapping>,
row_deserializer: Arc<SD>,
}
impl<S: StateStore, SD: ValueRowSerde> StorageTableInnerIterLogInner<S, SD> {
#[allow(clippy::too_many_arguments)]
async fn new(
store: &S,
mapping: Arc<ColumnMapping>,
row_deserializer: Arc<SD>,
table_key_range: TableKeyRange,
read_options: ReadLogOptions,
start_epoch: u64,
end_epoch: HummockReadEpoch,
) -> StorageResult<Self> {
store
.try_wait_epoch(
end_epoch,
TryWaitEpochOptions {
table_id: read_options.table_id,
},
)
.await?;
let iter = store
.iter_log(
(start_epoch, end_epoch.get_epoch()),
table_key_range,
read_options,
)
.await?;
let iter = Self {
iter,
mapping,
row_deserializer,
};
Ok(iter)
}
fn into_stream(self) -> impl Stream<Item = StorageResult<KeyedChangeLogRow<Bytes>>> {
self.iter.into_stream(move |(table_key, value)| {
value
.try_map(|value| {
let full_row = self.row_deserializer.deserialize(value)?;
let row = self
.mapping
.project(OwnedRow::new(full_row))
.into_owned_row();
Ok(row)
})
.map(|row| KeyedChangeLogRow {
vnode_prefixed_key: table_key.copy_into(),
row,
})
})
}
}