use std::cmp::min;
use std::default::Default;
use std::fmt::{Debug, Formatter};
use std::future::Future;
use std::marker::PhantomData;
use std::ops::Bound;
use std::sync::{Arc, LazyLock};
use bytes::Bytes;
use futures::{Stream, TryFutureExt, TryStreamExt};
use futures_async_stream::try_stream;
use prost::Message;
use risingwave_common::array::Op;
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{TableId, TableOption};
use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::{Epoch, EpochPair};
use risingwave_hummock_sdk::key::{FullKey, TableKey, TableKeyRange};
use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection};
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_hummock_trace::{
TracedInitOptions, TracedNewLocalOptions, TracedOpConsistencyLevel, TracedPrefetchOptions,
TracedReadOptions, TracedSealCurrentEpochOptions, TracedTryWaitEpochOptions,
TracedWriteOptions,
};
use risingwave_pb::hummock::PbVnodeWatermark;
use crate::error::{StorageError, StorageResult};
use crate::hummock::CachePolicy;
use crate::monitor::{MonitoredStateStore, MonitoredStorageMetrics};
use crate::storage_value::StorageValue;
pub trait StaticSendSync = Send + Sync + 'static;
pub trait IterItem: Send + 'static {
type ItemRef<'a>: Send + Copy + 'a;
}
impl IterItem for StateStoreKeyedRow {
type ItemRef<'a> = StateStoreKeyedRowRef<'a>;
}
impl IterItem for StateStoreReadLogItem {
type ItemRef<'a> = StateStoreReadLogItemRef<'a>;
}
pub trait StateStoreIter<T: IterItem = StateStoreKeyedRow>: Send {
fn try_next(
&mut self,
) -> impl Future<Output = StorageResult<Option<T::ItemRef<'_>>>> + Send + '_;
}
pub fn to_owned_item((key, value): StateStoreKeyedRowRef<'_>) -> StorageResult<StateStoreKeyedRow> {
Ok((key.copy_into(), Bytes::copy_from_slice(value)))
}
pub trait StateStoreIterExt<T: IterItem = StateStoreKeyedRow>: StateStoreIter<T> + Sized {
type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>>: Stream<Item = StorageResult<O>>
+ Send;
fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
self,
f: F,
) -> Self::ItemStream<O, F>;
fn fused(self) -> FusedStateStoreIter<Self, T> {
FusedStateStoreIter::new(self)
}
}
#[try_stream(ok = O, error = StorageError)]
async fn into_stream_inner<
T: IterItem,
I: StateStoreIter<T>,
O: Send,
F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send,
>(
iter: I,
f: F,
) {
let mut iter = iter.fused();
while let Some(item) = iter.try_next().await? {
yield f(item)?;
}
}
pub struct FromStreamStateStoreIter<S> {
inner: S,
item_buffer: Option<StateStoreKeyedRow>,
}
impl<S> FromStreamStateStoreIter<S> {
pub fn new(inner: S) -> Self {
Self {
inner,
item_buffer: None,
}
}
}
impl<S: Stream<Item = StorageResult<StateStoreKeyedRow>> + Unpin + Send> StateStoreIter
for FromStreamStateStoreIter<S>
{
async fn try_next(&mut self) -> StorageResult<Option<StateStoreKeyedRowRef<'_>>> {
self.item_buffer = self.inner.try_next().await?;
Ok(self
.item_buffer
.as_ref()
.map(|(key, value)| (key.to_ref(), value.as_ref())))
}
}
pub struct FusedStateStoreIter<I, T> {
inner: I,
finished: bool,
_phantom: PhantomData<T>,
}
impl<I, T> FusedStateStoreIter<I, T> {
fn new(inner: I) -> Self {
Self {
inner,
finished: false,
_phantom: PhantomData,
}
}
}
impl<T: IterItem, I: StateStoreIter<T>> FusedStateStoreIter<I, T> {
async fn try_next(&mut self) -> StorageResult<Option<T::ItemRef<'_>>> {
assert!(!self.finished, "call try_next after finish");
let result = self.inner.try_next().await;
match &result {
Ok(Some(_)) => {}
Ok(None) | Err(_) => {
self.finished = true;
}
}
result
}
}
impl<T: IterItem, I: StateStoreIter<T>> StateStoreIterExt<T> for I {
type ItemStream<O: Send, F: Send + for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O>> =
impl Stream<Item = StorageResult<O>> + Send;
fn into_stream<O: Send, F: for<'a> Fn(T::ItemRef<'a>) -> StorageResult<O> + Send>(
self,
f: F,
) -> Self::ItemStream<O, F> {
into_stream_inner(self, f)
}
}
pub type StateStoreKeyedRowRef<'a> = (FullKey<&'a [u8]>, &'a [u8]);
pub type StateStoreKeyedRow = (FullKey<Bytes>, Bytes);
pub trait StateStoreReadIter = StateStoreIter + 'static;
#[derive(Clone, Copy, Eq, PartialEq, Debug)]
pub enum ChangeLogValue<T> {
Insert(T),
Update { new_value: T, old_value: T },
Delete(T),
}
impl<T> ChangeLogValue<T> {
pub fn try_map<O>(self, f: impl Fn(T) -> StorageResult<O>) -> StorageResult<ChangeLogValue<O>> {
Ok(match self {
ChangeLogValue::Insert(value) => ChangeLogValue::Insert(f(value)?),
ChangeLogValue::Update {
new_value,
old_value,
} => ChangeLogValue::Update {
new_value: f(new_value)?,
old_value: f(old_value)?,
},
ChangeLogValue::Delete(value) => ChangeLogValue::Delete(f(value)?),
})
}
pub fn into_op_value_iter(self) -> impl Iterator<Item = (Op, T)> {
std::iter::from_coroutine(
#[coroutine]
move || match self {
Self::Insert(row) => {
yield (Op::Insert, row);
}
Self::Delete(row) => {
yield (Op::Delete, row);
}
Self::Update {
old_value,
new_value,
} => {
yield (Op::UpdateDelete, old_value);
yield (Op::UpdateInsert, new_value);
}
},
)
}
}
impl<T: AsRef<[u8]>> ChangeLogValue<T> {
pub fn to_ref(&self) -> ChangeLogValue<&[u8]> {
match self {
ChangeLogValue::Insert(val) => ChangeLogValue::Insert(val.as_ref()),
ChangeLogValue::Update {
new_value,
old_value,
} => ChangeLogValue::Update {
new_value: new_value.as_ref(),
old_value: old_value.as_ref(),
},
ChangeLogValue::Delete(val) => ChangeLogValue::Delete(val.as_ref()),
}
}
}
pub type StateStoreReadLogItem = (TableKey<Bytes>, ChangeLogValue<Bytes>);
pub type StateStoreReadLogItemRef<'a> = (TableKey<&'a [u8]>, ChangeLogValue<&'a [u8]>);
#[derive(Clone)]
pub struct ReadLogOptions {
pub table_id: TableId,
}
pub trait StateStoreReadChangeLogIter = StateStoreIter<StateStoreReadLogItem> + Send + 'static;
pub trait StateStoreRead: StaticSendSync {
type Iter: StateStoreReadIter;
type RevIter: StateStoreReadIter;
type ChangeLogIter: StateStoreReadChangeLogIter;
fn get_keyed_row(
&self,
key: TableKey<Bytes>,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Option<StateStoreKeyedRow>>> + Send + '_;
fn get(
&self,
key: TableKey<Bytes>,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_ {
self.get_keyed_row(key, epoch, read_options)
.map_ok(|v| v.map(|(_, v)| v))
}
fn iter(
&self,
key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::Iter>> + Send + '_;
fn rev_iter(
&self,
key_range: TableKeyRange,
epoch: u64,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter>> + Send + '_;
fn iter_log(
&self,
epoch_range: (u64, u64),
key_range: TableKeyRange,
options: ReadLogOptions,
) -> impl Future<Output = StorageResult<Self::ChangeLogIter>> + Send + '_;
}
pub trait StateStoreReadExt: StaticSendSync {
fn scan(
&self,
key_range: TableKeyRange,
epoch: u64,
limit: Option<usize>,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Vec<StateStoreKeyedRow>>> + Send + '_;
}
impl<S: StateStoreRead> StateStoreReadExt for S {
async fn scan(
&self,
key_range: TableKeyRange,
epoch: u64,
limit: Option<usize>,
mut read_options: ReadOptions,
) -> StorageResult<Vec<StateStoreKeyedRow>> {
if limit.is_some() {
read_options.prefetch_options.prefetch = false;
}
const MAX_INITIAL_CAP: usize = 1024;
let limit = limit.unwrap_or(usize::MAX);
let mut ret = Vec::with_capacity(min(limit, MAX_INITIAL_CAP));
let mut iter = self.iter(key_range, epoch, read_options).await?;
while let Some((key, value)) = iter.try_next().await? {
ret.push((key.copy_into(), Bytes::copy_from_slice(value)))
}
Ok(ret)
}
}
pub trait StateStoreWrite: StaticSendSync {
fn ingest_batch(
&self,
kv_pairs: Vec<(TableKey<Bytes>, StorageValue)>,
delete_ranges: Vec<(Bound<Bytes>, Bound<Bytes>)>,
write_options: WriteOptions,
) -> StorageResult<usize>;
}
#[derive(Clone)]
pub struct TryWaitEpochOptions {
pub table_id: TableId,
}
impl TryWaitEpochOptions {
#[cfg(any(test, feature = "test"))]
pub fn for_test(table_id: TableId) -> Self {
Self { table_id }
}
}
impl From<TracedTryWaitEpochOptions> for TryWaitEpochOptions {
fn from(value: TracedTryWaitEpochOptions) -> Self {
Self {
table_id: value.table_id.into(),
}
}
}
impl From<TryWaitEpochOptions> for TracedTryWaitEpochOptions {
fn from(value: TryWaitEpochOptions) -> Self {
Self {
table_id: value.table_id.into(),
}
}
}
pub trait StateStore: StateStoreRead + StaticSendSync + Clone {
type Local: LocalStateStore;
fn try_wait_epoch(
&self,
epoch: HummockReadEpoch,
options: TryWaitEpochOptions,
) -> impl Future<Output = StorageResult<()>> + Send + '_;
fn monitored(self, storage_metrics: Arc<MonitoredStorageMetrics>) -> MonitoredStateStore<Self> {
MonitoredStateStore::new(self, storage_metrics)
}
fn new_local(&self, option: NewLocalOptions) -> impl Future<Output = Self::Local> + Send + '_;
}
pub trait LocalStateStore: StaticSendSync {
type Iter<'a>: StateStoreIter + 'a;
type RevIter<'a>: StateStoreIter + 'a;
fn get(
&self,
key: TableKey<Bytes>,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Option<Bytes>>> + Send + '_;
fn iter(
&self,
key_range: TableKeyRange,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::Iter<'_>>> + Send + '_;
fn rev_iter(
&self,
key_range: TableKeyRange,
read_options: ReadOptions,
) -> impl Future<Output = StorageResult<Self::RevIter<'_>>> + Send + '_;
fn get_table_watermark(&self, vnode: VirtualNode) -> Option<Bytes>;
fn insert(
&mut self,
key: TableKey<Bytes>,
new_val: Bytes,
old_val: Option<Bytes>,
) -> StorageResult<()>;
fn delete(&mut self, key: TableKey<Bytes>, old_val: Bytes) -> StorageResult<()>;
fn flush(&mut self) -> impl Future<Output = StorageResult<usize>> + Send + '_;
fn try_flush(&mut self) -> impl Future<Output = StorageResult<()>> + Send + '_;
fn epoch(&self) -> u64;
fn is_dirty(&self) -> bool;
fn init(&mut self, opts: InitOptions) -> impl Future<Output = StorageResult<()>> + Send + '_;
fn seal_current_epoch(&mut self, next_epoch: u64, opts: SealCurrentEpochOptions);
fn update_vnode_bitmap(&mut self, vnodes: Arc<Bitmap>) -> Arc<Bitmap>;
}
#[derive(Default, Clone, Copy)]
pub struct PrefetchOptions {
pub prefetch: bool,
pub for_large_query: bool,
}
impl PrefetchOptions {
pub fn prefetch_for_large_range_scan() -> Self {
Self {
prefetch: true,
for_large_query: true,
}
}
pub fn prefetch_for_small_range_scan() -> Self {
Self {
prefetch: true,
for_large_query: false,
}
}
pub fn new(prefetch: bool, for_large_query: bool) -> Self {
Self {
prefetch,
for_large_query,
}
}
}
impl From<TracedPrefetchOptions> for PrefetchOptions {
fn from(value: TracedPrefetchOptions) -> Self {
Self {
prefetch: value.prefetch,
for_large_query: value.for_large_query,
}
}
}
impl From<PrefetchOptions> for TracedPrefetchOptions {
fn from(value: PrefetchOptions) -> Self {
Self {
prefetch: value.prefetch,
for_large_query: value.for_large_query,
}
}
}
#[derive(Default, Clone)]
pub struct ReadOptions {
pub prefix_hint: Option<Bytes>,
pub prefetch_options: PrefetchOptions,
pub cache_policy: CachePolicy,
pub retention_seconds: Option<u32>,
pub table_id: TableId,
pub read_version_from_backup: bool,
pub read_committed: bool,
}
impl From<TracedReadOptions> for ReadOptions {
fn from(value: TracedReadOptions) -> Self {
Self {
prefix_hint: value.prefix_hint.map(|b| b.into()),
prefetch_options: value.prefetch_options.into(),
cache_policy: value.cache_policy.into(),
retention_seconds: value.retention_seconds,
table_id: value.table_id.into(),
read_version_from_backup: value.read_version_from_backup,
read_committed: value.read_committed,
}
}
}
impl From<ReadOptions> for TracedReadOptions {
fn from(value: ReadOptions) -> Self {
Self {
prefix_hint: value.prefix_hint.map(|b| b.into()),
prefetch_options: value.prefetch_options.into(),
cache_policy: value.cache_policy.into(),
retention_seconds: value.retention_seconds,
table_id: value.table_id.into(),
read_version_from_backup: value.read_version_from_backup,
read_committed: value.read_committed,
}
}
}
pub fn gen_min_epoch(base_epoch: u64, retention_seconds: Option<&u32>) -> u64 {
let base_epoch = Epoch(base_epoch);
match retention_seconds {
Some(retention_seconds_u32) => {
base_epoch
.subtract_ms(*retention_seconds_u32 as u64 * 1000)
.0
}
None => 0,
}
}
#[derive(Default, Clone)]
pub struct WriteOptions {
pub epoch: u64,
pub table_id: TableId,
}
impl From<TracedWriteOptions> for WriteOptions {
fn from(value: TracedWriteOptions) -> Self {
Self {
epoch: value.epoch,
table_id: value.table_id.into(),
}
}
}
pub trait CheckOldValueEquality = Fn(&Bytes, &Bytes) -> bool + Send + Sync;
pub static CHECK_BYTES_EQUAL: LazyLock<Arc<dyn CheckOldValueEquality>> =
LazyLock::new(|| Arc::new(|first: &Bytes, second: &Bytes| first == second));
#[derive(Default, Clone)]
pub enum OpConsistencyLevel {
#[default]
Inconsistent,
ConsistentOldValue {
check_old_value: Arc<dyn CheckOldValueEquality>,
is_log_store: bool,
},
}
impl Debug for OpConsistencyLevel {
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
match self {
OpConsistencyLevel::Inconsistent => f.write_str("OpConsistencyLevel::Inconsistent"),
OpConsistencyLevel::ConsistentOldValue { is_log_store, .. } => f
.debug_struct("OpConsistencyLevel::ConsistentOldValue")
.field("is_log_store", is_log_store)
.finish(),
}
}
}
impl PartialEq<Self> for OpConsistencyLevel {
fn eq(&self, other: &Self) -> bool {
matches!(
(self, other),
(
OpConsistencyLevel::Inconsistent,
OpConsistencyLevel::Inconsistent
) | (
OpConsistencyLevel::ConsistentOldValue {
is_log_store: true,
..
},
OpConsistencyLevel::ConsistentOldValue {
is_log_store: true,
..
},
) | (
OpConsistencyLevel::ConsistentOldValue {
is_log_store: false,
..
},
OpConsistencyLevel::ConsistentOldValue {
is_log_store: false,
..
},
)
)
}
}
impl Eq for OpConsistencyLevel {}
impl OpConsistencyLevel {
pub fn update(&mut self, new_level: &OpConsistencyLevel) {
assert_ne!(self, new_level);
*self = new_level.clone()
}
}
#[derive(Clone)]
pub struct NewLocalOptions {
pub table_id: TableId,
pub op_consistency_level: OpConsistencyLevel,
pub table_option: TableOption,
pub is_replicated: bool,
pub vnodes: Arc<Bitmap>,
}
impl From<TracedNewLocalOptions> for NewLocalOptions {
fn from(value: TracedNewLocalOptions) -> Self {
Self {
table_id: value.table_id.into(),
op_consistency_level: match value.op_consistency_level {
TracedOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
TracedOpConsistencyLevel::ConsistentOldValue => {
OpConsistencyLevel::ConsistentOldValue {
check_old_value: CHECK_BYTES_EQUAL.clone(),
is_log_store: false,
}
}
},
table_option: value.table_option.into(),
is_replicated: value.is_replicated,
vnodes: Arc::new(value.vnodes.into()),
}
}
}
impl From<NewLocalOptions> for TracedNewLocalOptions {
fn from(value: NewLocalOptions) -> Self {
Self {
table_id: value.table_id.into(),
op_consistency_level: match value.op_consistency_level {
OpConsistencyLevel::Inconsistent => TracedOpConsistencyLevel::Inconsistent,
OpConsistencyLevel::ConsistentOldValue { .. } => {
TracedOpConsistencyLevel::ConsistentOldValue
}
},
table_option: value.table_option.into(),
is_replicated: value.is_replicated,
vnodes: value.vnodes.as_ref().clone().into(),
}
}
}
impl NewLocalOptions {
pub fn new(
table_id: TableId,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,
vnodes: Arc<Bitmap>,
) -> Self {
NewLocalOptions {
table_id,
op_consistency_level,
table_option,
is_replicated: false,
vnodes,
}
}
pub fn new_replicated(
table_id: TableId,
op_consistency_level: OpConsistencyLevel,
table_option: TableOption,
vnodes: Arc<Bitmap>,
) -> Self {
NewLocalOptions {
table_id,
op_consistency_level,
table_option,
is_replicated: true,
vnodes,
}
}
pub fn for_test(table_id: TableId) -> Self {
Self {
table_id,
op_consistency_level: OpConsistencyLevel::Inconsistent,
table_option: TableOption {
retention_seconds: None,
},
is_replicated: false,
vnodes: Arc::new(Bitmap::ones(VirtualNode::COUNT_FOR_TEST)),
}
}
}
#[derive(Clone)]
pub struct InitOptions {
pub epoch: EpochPair,
}
impl InitOptions {
pub fn new(epoch: EpochPair) -> Self {
Self { epoch }
}
}
impl From<InitOptions> for TracedInitOptions {
fn from(value: InitOptions) -> Self {
TracedInitOptions {
epoch: value.epoch.into(),
}
}
}
impl From<TracedInitOptions> for InitOptions {
fn from(value: TracedInitOptions) -> Self {
InitOptions {
epoch: value.epoch.into(),
}
}
}
#[derive(Clone, Debug)]
pub struct SealCurrentEpochOptions {
pub table_watermarks: Option<(WatermarkDirection, Vec<VnodeWatermark>)>,
pub switch_op_consistency_level: Option<OpConsistencyLevel>,
}
impl From<SealCurrentEpochOptions> for TracedSealCurrentEpochOptions {
fn from(value: SealCurrentEpochOptions) -> Self {
TracedSealCurrentEpochOptions {
table_watermarks: value.table_watermarks.map(|(direction, watermarks)| {
(
direction == WatermarkDirection::Ascending,
watermarks
.into_iter()
.map(|watermark| {
let pb_watermark = PbVnodeWatermark::from(watermark);
Message::encode_to_vec(&pb_watermark)
})
.collect(),
)
}),
switch_op_consistency_level: value
.switch_op_consistency_level
.map(|level| matches!(level, OpConsistencyLevel::ConsistentOldValue { .. })),
}
}
}
impl From<TracedSealCurrentEpochOptions> for SealCurrentEpochOptions {
fn from(value: TracedSealCurrentEpochOptions) -> SealCurrentEpochOptions {
SealCurrentEpochOptions {
table_watermarks: value.table_watermarks.map(|(is_ascending, watermarks)| {
(
if is_ascending {
WatermarkDirection::Ascending
} else {
WatermarkDirection::Descending
},
watermarks
.into_iter()
.map(|serialized_watermark| {
Message::decode(serialized_watermark.as_slice())
.map(|pb: PbVnodeWatermark| VnodeWatermark::from(pb))
.expect("should not failed")
})
.collect(),
)
}),
switch_op_consistency_level: value.switch_op_consistency_level.map(|enable| {
if enable {
OpConsistencyLevel::ConsistentOldValue {
check_old_value: CHECK_BYTES_EQUAL.clone(),
is_log_store: false,
}
} else {
OpConsistencyLevel::Inconsistent
}
}),
}
}
}
impl SealCurrentEpochOptions {
pub fn for_test() -> Self {
Self {
table_watermarks: None,
switch_op_consistency_level: None,
}
}
}