use std::collections::HashMap;
use std::ops::Bound;
use std::ops::Bound::*;
use std::sync::Arc;
use bytes::{BufMut, Bytes, BytesMut};
use either::Either;
use foyer::CacheContext;
use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
use futures_async_stream::for_await;
use itertools::{izip, Itertools};
use risingwave_common::array::stream_record::Record;
use risingwave_common::array::{ArrayImplBuilder, ArrayRef, DataChunk, Op, StreamChunk};
use risingwave_common::bitmap::Bitmap;
use risingwave_common::catalog::{
get_dist_key_in_pk_indices, ColumnDesc, ColumnId, TableId, TableOption,
};
use risingwave_common::hash::{VirtualNode, VnodeBitmapExt, VnodeCountCompat};
use risingwave_common::row::{self, once, CompactedRow, Once, OwnedRow, Row, RowExt};
use risingwave_common::types::{DataType, Datum, DefaultOrd, DefaultOrdered, ScalarImpl};
use risingwave_common::util::column_index_mapping::ColIndexMapping;
use risingwave_common::util::epoch::EpochPair;
use risingwave_common::util::iter_util::{ZipEqDebug, ZipEqFast};
use risingwave_common::util::row_serde::OrderedRowSerde;
use risingwave_common::util::sort_util::OrderType;
use risingwave_common::util::value_encoding::BasicSerde;
use risingwave_hummock_sdk::key::{
end_bound_of_prefix, prefixed_range_with_vnode, start_bound_of_excluded_prefix, TableKey,
TableKeyRange,
};
use risingwave_hummock_sdk::table_watermark::{VnodeWatermark, WatermarkDirection};
use risingwave_pb::catalog::Table;
use risingwave_storage::error::{ErrorKind, StorageError};
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::mem_table::MemTableError;
use risingwave_storage::row_serde::find_columns_by_ids;
use risingwave_storage::row_serde::row_serde_util::{
deserialize_pk_with_vnode, serialize_pk, serialize_pk_with_vnode,
};
use risingwave_storage::row_serde::value_serde::ValueRowSerde;
use risingwave_storage::store::{
InitOptions, LocalStateStore, NewLocalOptions, OpConsistencyLevel, PrefetchOptions,
ReadLogOptions, ReadOptions, SealCurrentEpochOptions, StateStoreIter, StateStoreIterExt,
};
use risingwave_storage::table::merge_sort::merge_sort;
use risingwave_storage::table::{
deserialize_log_stream, ChangeLogRow, KeyedRow, TableDistribution,
};
use risingwave_storage::StateStore;
use thiserror_ext::AsReport;
use tracing::{trace, Instrument};
use crate::cache::cache_may_stale;
use crate::common::state_cache::{StateCache, StateCacheFiller};
use crate::common::table::state_table_cache::StateTableWatermarkCache;
use crate::executor::{StreamExecutorError, StreamExecutorResult};
const WATERMARK_CACHE_ENTRIES: usize = 16;
macro_rules! insane_mode_discard_point {
() => {{
use rand::Rng;
if crate::consistency::insane() && rand::thread_rng().gen_bool(0.3) {
return;
}
}};
}
#[derive(Clone)]
pub struct StateTableInner<
S,
SD = BasicSerde,
const IS_REPLICATED: bool = false,
const USE_WATERMARK_CACHE: bool = false,
> where
S: StateStore,
SD: ValueRowSerde,
{
table_id: TableId,
local_store: S::Local,
store: S,
pk_serde: OrderedRowSerde,
row_serde: SD,
pk_indices: Vec<usize>,
distribution: TableDistribution,
prefix_hint_len: usize,
table_option: TableOption,
value_indices: Option<Vec<usize>>,
pending_watermark: Option<ScalarImpl>,
committed_watermark: Option<ScalarImpl>,
watermark_cache: StateTableWatermarkCache,
data_types: Vec<DataType>,
i2o_mapping: ColIndexMapping,
output_indices: Vec<usize>,
op_consistency_level: StateTableOpConsistencyLevel,
}
pub type StateTable<S> = StateTableInner<S, BasicSerde>;
pub type ReplicatedStateTable<S, SD> = StateTableInner<S, SD, true>;
pub type WatermarkCacheStateTable<S> = StateTableInner<S, BasicSerde, false, true>;
pub type WatermarkCacheParameterizedStateTable<S, const USE_WATERMARK_CACHE: bool> =
StateTableInner<S, BasicSerde, false, USE_WATERMARK_CACHE>;
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.local_store.init(InitOptions::new(epoch)).await?;
Ok(())
}
pub fn state_store(&self) -> &S {
&self.store
}
}
fn consistent_old_value_op(
row_serde: impl ValueRowSerde,
is_log_store: bool,
) -> OpConsistencyLevel {
OpConsistencyLevel::ConsistentOldValue {
check_old_value: Arc::new(move |first: &Bytes, second: &Bytes| {
if first == second {
return true;
}
let first = match row_serde.deserialize(first) {
Ok(rows) => rows,
Err(e) => {
error!(error = %e.as_report(), value = ?first, "fail to deserialize serialized value");
return false;
}
};
let second = match row_serde.deserialize(second) {
Ok(rows) => rows,
Err(e) => {
error!(error = %e.as_report(), value = ?second, "fail to deserialize serialized value");
return false;
}
};
if first != second {
error!(first = ?first, second = ?second, "sanity check fail");
false
} else {
true
}
}),
is_log_store,
}
}
#[derive(Eq, PartialEq, Copy, Clone, Debug)]
pub enum StateTableOpConsistencyLevel {
Inconsistent,
ConsistentOldValue,
LogStoreEnabled,
}
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn from_table_catalog(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
Self::from_table_catalog_with_consistency_level(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::ConsistentOldValue,
)
.await
}
pub async fn from_table_catalog_inconsistent_op(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
) -> Self {
Self::from_table_catalog_with_consistency_level(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::Inconsistent,
)
.await
}
pub async fn from_table_catalog_with_consistency_level(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
consistency_level: StateTableOpConsistencyLevel,
) -> Self {
Self::from_table_catalog_inner(table_catalog, store, vnodes, consistency_level, vec![])
.await
}
async fn from_table_catalog_inner(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
op_consistency_level: StateTableOpConsistencyLevel,
output_column_ids: Vec<ColumnId>,
) -> Self {
let table_id = TableId::new(table_catalog.id);
let table_columns: Vec<ColumnDesc> = table_catalog
.columns
.iter()
.map(|col| col.column_desc.as_ref().unwrap().into())
.collect();
let data_types: Vec<DataType> = table_catalog
.columns
.iter()
.map(|col| {
col.get_column_desc()
.unwrap()
.get_column_type()
.unwrap()
.into()
})
.collect();
let order_types: Vec<OrderType> = table_catalog
.pk
.iter()
.map(|col_order| OrderType::from_protobuf(col_order.get_order_type().unwrap()))
.collect();
let dist_key_indices: Vec<usize> = table_catalog
.distribution_key
.iter()
.map(|dist_index| *dist_index as usize)
.collect();
let pk_indices = table_catalog
.pk
.iter()
.map(|col_order| col_order.column_index as usize)
.collect_vec();
let dist_key_in_pk_indices = if table_catalog.get_dist_key_in_pk().is_empty() {
get_dist_key_in_pk_indices(&dist_key_indices, &pk_indices).unwrap()
} else {
table_catalog
.get_dist_key_in_pk()
.iter()
.map(|idx| *idx as usize)
.collect()
};
let vnode_col_idx_in_pk = table_catalog.vnode_col_index.as_ref().and_then(|idx| {
let vnode_col_idx = *idx as usize;
pk_indices.iter().position(|&i| vnode_col_idx == i)
});
let distribution =
TableDistribution::new(vnodes, dist_key_in_pk_indices, vnode_col_idx_in_pk);
assert_eq!(
distribution.vnode_count(),
table_catalog.vnode_count(),
"vnode count mismatch, scanning table {} under wrong distribution?",
table_catalog.name,
);
let pk_data_types = pk_indices
.iter()
.map(|i| table_columns[*i].data_type.clone())
.collect();
let pk_serde = OrderedRowSerde::new(pk_data_types, order_types);
let input_value_indices = table_catalog
.value_indices
.iter()
.map(|val| *val as usize)
.collect_vec();
let no_shuffle_value_indices = (0..table_columns.len()).collect_vec();
let value_indices = match input_value_indices.len() == table_columns.len()
&& input_value_indices == no_shuffle_value_indices
{
true => None,
false => Some(input_value_indices),
};
let prefix_hint_len = table_catalog.read_prefix_len_hint as usize;
let make_row_serde = || {
SD::new(
Arc::from_iter(table_catalog.value_indices.iter().map(|val| *val as usize)),
Arc::from(table_columns.clone().into_boxed_slice()),
)
};
let state_table_op_consistency_level = op_consistency_level;
let op_consistency_level = match op_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde, false)
}
StateTableOpConsistencyLevel::LogStoreEnabled => {
let row_serde = make_row_serde();
consistent_old_value_op(row_serde, true)
}
};
let table_option = TableOption::new(table_catalog.retention_seconds);
let new_local_options = if IS_REPLICATED {
NewLocalOptions::new_replicated(
table_id,
op_consistency_level,
table_option,
distribution.vnodes().clone(),
)
} else {
NewLocalOptions::new(
table_id,
op_consistency_level,
table_option,
distribution.vnodes().clone(),
)
};
let local_state_store = store.new_local(new_local_options).await;
let row_serde = make_row_serde();
assert_eq!(
table_catalog.version.is_some(),
row_serde.kind().is_column_aware()
);
let prefix_deser = if pk_indices.is_empty() {
None
} else {
Some(pk_serde.prefix(1))
};
let max_watermark_of_vnodes = distribution
.vnodes()
.iter_vnodes()
.filter_map(|vnode| local_state_store.get_table_watermark(vnode))
.max();
let committed_watermark = if let Some(deser) = prefix_deser
&& let Some(max_watermark) = max_watermark_of_vnodes
{
let deserialized = deser
.deserialize(&max_watermark)
.ok()
.and_then(|row| row[0].clone());
if deserialized.is_none() {
tracing::error!(
vnodes = ?distribution.vnodes(),
watermark = ?max_watermark,
"Failed to deserialize persisted watermark from state store.",
);
}
deserialized
} else {
None
};
let watermark_cache = if USE_WATERMARK_CACHE {
StateTableWatermarkCache::new(WATERMARK_CACHE_ENTRIES)
} else {
StateTableWatermarkCache::new(0)
};
let output_column_ids_to_input_idx = output_column_ids
.iter()
.enumerate()
.map(|(pos, id)| (*id, pos))
.collect::<HashMap<_, _>>();
let columns: Vec<ColumnDesc> = table_catalog
.columns
.iter()
.map(|c| c.column_desc.as_ref().unwrap().into())
.collect_vec();
let mut i2o_mapping = vec![None; columns.len()];
for (i, column) in columns.iter().enumerate() {
if let Some(pos) = output_column_ids_to_input_idx.get(&column.column_id) {
i2o_mapping[i] = Some(*pos);
}
}
let i2o_mapping = ColIndexMapping::new(i2o_mapping, output_column_ids.len());
let (_, output_indices) = find_columns_by_ids(&columns[..], &output_column_ids);
Self {
table_id,
local_store: local_state_store,
store,
pk_serde,
row_serde,
pk_indices,
distribution,
prefix_hint_len,
table_option,
value_indices,
pending_watermark: None,
committed_watermark,
watermark_cache,
data_types,
output_indices,
i2o_mapping,
op_consistency_level: state_table_op_consistency_level,
}
}
pub fn get_data_types(&self) -> &[DataType] {
&self.data_types
}
pub fn table_id(&self) -> u32 {
self.table_id.table_id
}
pub fn epoch(&self) -> u64 {
self.local_store.epoch()
}
fn compute_prefix_vnode(&self, pk_prefix: impl Row) -> VirtualNode {
self.distribution
.try_compute_vnode_by_pk_prefix(pk_prefix)
.expect("For streaming, the given prefix must be enough to calculate the vnode")
}
pub fn compute_vnode_by_pk(&self, pk: impl Row) -> VirtualNode {
self.distribution.compute_vnode_by_pk(pk)
}
pub fn pk_indices(&self) -> &[usize] {
&self.pk_indices
}
pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
assert!(IS_REPLICATED);
self.pk_indices
.iter()
.map(|&i| self.output_indices.iter().position(|&j| i == j))
.collect()
}
pub fn pk_serde(&self) -> &OrderedRowSerde {
&self.pk_serde
}
pub fn vnodes(&self) -> &Arc<Bitmap> {
self.distribution.vnodes()
}
pub fn value_indices(&self) -> &Option<Vec<usize>> {
&self.value_indices
}
fn is_dirty(&self) -> bool {
self.local_store.is_dirty() || self.pending_watermark.is_some()
}
pub fn is_consistent_op(&self) -> bool {
matches!(
self.op_consistency_level,
StateTableOpConsistencyLevel::ConsistentOldValue
| StateTableOpConsistencyLevel::LogStoreEnabled
)
}
}
impl<S, SD, const USE_WATERMARK_CACHE: bool> StateTableInner<S, SD, true, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn from_table_catalog_with_output_column_ids(
table_catalog: &Table,
store: S,
vnodes: Option<Arc<Bitmap>>,
output_column_ids: Vec<ColumnId>,
) -> Self {
Self::from_table_catalog_inner(
table_catalog,
store,
vnodes,
StateTableOpConsistencyLevel::Inconsistent,
output_column_ids,
)
.await
}
}
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn get_row(&self, pk: impl Row) -> StreamExecutorResult<Option<OwnedRow>> {
let encoded_row: Option<Bytes> = self.get_encoded_row(pk).await?;
match encoded_row {
Some(encoded_row) => {
let row = self.row_serde.deserialize(&encoded_row)?;
if IS_REPLICATED {
let row = row.project(&self.output_indices);
Ok(Some(row.into_owned_row()))
} else {
Ok(Some(OwnedRow::new(row)))
}
}
None => Ok(None),
}
}
pub async fn get_encoded_row(&self, pk: impl Row) -> StreamExecutorResult<Option<Bytes>> {
assert!(pk.len() <= self.pk_indices.len());
let serialized_pk =
serialize_pk_with_vnode(&pk, &self.pk_serde, self.compute_vnode_by_pk(&pk));
let prefix_hint = if self.prefix_hint_len != 0 && self.prefix_hint_len == pk.len() {
Some(serialized_pk.slice(VirtualNode::SIZE..))
} else {
#[cfg(debug_assertions)]
if self.prefix_hint_len != 0 {
warn!("prefix_hint_len is not equal to pk.len(), may not be able to utilize bloom filter");
}
None
};
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
self.local_store
.get(serialized_pk, read_options)
.await
.map_err(Into::into)
}
pub async fn get_compacted_row(
&self,
pk: impl Row,
) -> StreamExecutorResult<Option<CompactedRow>> {
if self.row_serde.kind().is_basic() {
self.get_encoded_row(pk)
.await
.map(|bytes| bytes.map(CompactedRow::new))
} else {
self.get_row(pk)
.await
.map(|row| row.map(CompactedRow::from))
}
}
#[must_use = "the executor should decide whether to manipulate the cache based on the previous vnode bitmap"]
pub fn update_vnode_bitmap(&mut self, new_vnodes: Arc<Bitmap>) -> (Arc<Bitmap>, bool) {
assert!(
!self.is_dirty(),
"vnode bitmap should only be updated when state table is clean"
);
let prev_vnodes = self.local_store.update_vnode_bitmap(new_vnodes.clone());
assert_eq!(
&prev_vnodes,
self.vnodes(),
"state table and state store vnode bitmap mismatches"
);
if self.distribution.is_singleton() {
assert_eq!(
&new_vnodes,
self.vnodes(),
"should not update vnode bitmap for singleton table"
);
}
assert_eq!(self.vnodes().len(), new_vnodes.len());
let cache_may_stale = cache_may_stale(self.vnodes(), &new_vnodes);
if cache_may_stale {
self.pending_watermark = None;
if USE_WATERMARK_CACHE {
self.watermark_cache.clear();
}
}
(
self.distribution.update_vnode_bitmap(new_vnodes),
cache_may_stale,
)
}
}
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
fn handle_mem_table_error(&self, e: StorageError) {
let e = match e.into_inner() {
ErrorKind::MemTable(e) => e,
_ => unreachable!("should only get memtable error"),
};
match *e {
MemTableError::InconsistentOperation { key, prev, new } => {
let (vnode, key) = deserialize_pk_with_vnode(&key, &self.pk_serde).unwrap();
panic!(
"mem-table operation inconsistent! table_id: {}, vnode: {}, key: {:?}, prev: {}, new: {}",
self.table_id(),
vnode,
&key,
prev.debug_fmt(&self.row_serde),
new.debug_fmt(&self.row_serde),
)
}
}
}
fn serialize_value(&self, value: impl Row) -> Bytes {
if let Some(value_indices) = self.value_indices.as_ref() {
self.row_serde
.serialize(value.project(value_indices))
.into()
} else {
self.row_serde.serialize(value).into()
}
}
fn insert_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
insane_mode_discard_point!();
self.local_store
.insert(key, value_bytes, None)
.unwrap_or_else(|e| self.handle_mem_table_error(e));
}
fn delete_inner(&mut self, key: TableKey<Bytes>, value_bytes: Bytes) {
insane_mode_discard_point!();
self.local_store
.delete(key, value_bytes)
.unwrap_or_else(|e| self.handle_mem_table_error(e));
}
fn update_inner(
&mut self,
key_bytes: TableKey<Bytes>,
old_value_bytes: Option<Bytes>,
new_value_bytes: Bytes,
) {
insane_mode_discard_point!();
self.local_store
.insert(key_bytes, new_value_bytes, old_value_bytes)
.unwrap_or_else(|e| self.handle_mem_table_error(e));
}
pub fn insert(&mut self, value: impl Row) {
let pk_indices = &self.pk_indices;
let pk = (&value).project(pk_indices);
if USE_WATERMARK_CACHE {
self.watermark_cache.insert(&pk);
}
let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
let value_bytes = self.serialize_value(value);
self.insert_inner(key_bytes, value_bytes);
}
pub fn delete(&mut self, old_value: impl Row) {
let pk_indices = &self.pk_indices;
let pk = (&old_value).project(pk_indices);
if USE_WATERMARK_CACHE {
self.watermark_cache.delete(&pk);
}
let key_bytes = serialize_pk_with_vnode(pk, &self.pk_serde, self.compute_vnode_by_pk(pk));
let value_bytes = self.serialize_value(old_value);
self.delete_inner(key_bytes, value_bytes);
}
pub fn update(&mut self, old_value: impl Row, new_value: impl Row) {
let old_pk = (&old_value).project(self.pk_indices());
let new_pk = (&new_value).project(self.pk_indices());
debug_assert!(
Row::eq(&old_pk, new_pk),
"pk should not change: {old_pk:?} vs {new_pk:?}",
);
let new_key_bytes =
serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
let old_value_bytes = self.serialize_value(old_value);
let new_value_bytes = self.serialize_value(new_value);
self.update_inner(new_key_bytes, Some(old_value_bytes), new_value_bytes);
}
pub fn update_without_old_value(&mut self, new_value: impl Row) {
let new_pk = (&new_value).project(self.pk_indices());
let new_key_bytes =
serialize_pk_with_vnode(new_pk, &self.pk_serde, self.compute_vnode_by_pk(new_pk));
let new_value_bytes = self.serialize_value(new_value);
self.update_inner(new_key_bytes, None, new_value_bytes);
}
pub fn write_record(&mut self, record: Record<impl Row>) {
match record {
Record::Insert { new_row } => self.insert(new_row),
Record::Delete { old_row } => self.delete(old_row),
Record::Update { old_row, new_row } => self.update(old_row, new_row),
}
}
fn fill_non_output_indices(&self, chunk: StreamChunk) -> StreamChunk {
fill_non_output_indices(&self.i2o_mapping, &self.data_types, chunk)
}
#[allow(clippy::disallowed_methods)]
pub fn write_chunk(&mut self, chunk: StreamChunk) {
let chunk = if IS_REPLICATED {
self.fill_non_output_indices(chunk)
} else {
chunk
};
let (chunk, op) = chunk.into_parts();
let vnodes = self
.distribution
.compute_chunk_vnode(&chunk, &self.pk_indices);
let values = if let Some(ref value_indices) = self.value_indices {
chunk.project(value_indices).serialize_with(&self.row_serde)
} else {
chunk.serialize_with(&self.row_serde)
};
let key_chunk = chunk.project(self.pk_indices());
let vnode_and_pks = key_chunk
.rows_with_holes()
.zip_eq_fast(vnodes.iter())
.map(|(r, vnode)| {
let mut buffer = BytesMut::new();
buffer.put_slice(&vnode.to_be_bytes()[..]);
if let Some(r) = r {
self.pk_serde.serialize(r, &mut buffer);
}
(r, buffer.freeze())
})
.collect_vec();
if !key_chunk.is_compacted() {
for ((op, (key, key_bytes), value), vis) in
izip!(op.iter(), vnode_and_pks, values).zip_eq_debug(key_chunk.visibility().iter())
{
if vis {
match op {
Op::Insert | Op::UpdateInsert => {
if USE_WATERMARK_CACHE && let Some(ref pk) = key {
self.watermark_cache.insert(pk);
}
self.insert_inner(TableKey(key_bytes), value);
}
Op::Delete | Op::UpdateDelete => {
if USE_WATERMARK_CACHE && let Some(ref pk) = key {
self.watermark_cache.delete(pk);
}
self.delete_inner(TableKey(key_bytes), value);
}
}
}
}
} else {
for (op, (key, key_bytes), value) in izip!(op.iter(), vnode_and_pks, values) {
match op {
Op::Insert | Op::UpdateInsert => {
if USE_WATERMARK_CACHE && let Some(ref pk) = key {
self.watermark_cache.insert(pk);
}
self.insert_inner(TableKey(key_bytes), value);
}
Op::Delete | Op::UpdateDelete => {
if USE_WATERMARK_CACHE && let Some(ref pk) = key {
self.watermark_cache.delete(pk);
}
self.delete_inner(TableKey(key_bytes), value);
}
}
}
}
}
pub fn update_watermark(&mut self, watermark: ScalarImpl) {
trace!(table_id = %self.table_id, watermark = ?watermark, "update watermark");
self.pending_watermark = Some(watermark);
}
pub fn get_committed_watermark(&self) -> Option<&ScalarImpl> {
self.committed_watermark.as_ref()
}
pub async fn commit(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
self.commit_inner(new_epoch, None).await
}
pub async fn commit_may_switch_consistent_op(
&mut self,
new_epoch: EpochPair,
op_consistency_level: StateTableOpConsistencyLevel,
) -> StreamExecutorResult<()> {
if self.op_consistency_level != op_consistency_level {
info!(
?new_epoch,
prev_op_consistency_level = ?self.op_consistency_level,
?op_consistency_level,
table_id = self.table_id.table_id,
"switch to new op consistency level"
);
self.commit_inner(new_epoch, Some(op_consistency_level))
.await
} else {
self.commit_inner(new_epoch, None).await
}
}
async fn commit_inner(
&mut self,
new_epoch: EpochPair,
switch_consistent_op: Option<StateTableOpConsistencyLevel>,
) -> StreamExecutorResult<()> {
assert_eq!(self.epoch(), new_epoch.prev);
let switch_op_consistency_level = switch_consistent_op.map(|new_consistency_level| {
assert_ne!(self.op_consistency_level, new_consistency_level);
self.op_consistency_level = new_consistency_level;
match new_consistency_level {
StateTableOpConsistencyLevel::Inconsistent => OpConsistencyLevel::Inconsistent,
StateTableOpConsistencyLevel::ConsistentOldValue => {
consistent_old_value_op(self.row_serde.clone(), false)
}
StateTableOpConsistencyLevel::LogStoreEnabled => {
consistent_old_value_op(self.row_serde.clone(), true)
}
}
});
trace!(
table_id = %self.table_id,
epoch = ?self.epoch(),
"commit state table"
);
let mut table_watermarks = None;
if self.is_dirty() {
self.local_store
.flush()
.instrument(tracing::info_span!("state_table_flush"))
.await?;
table_watermarks = self.commit_pending_watermark();
}
self.local_store.seal_current_epoch(
new_epoch.curr,
SealCurrentEpochOptions {
table_watermarks,
switch_op_consistency_level,
},
);
if USE_WATERMARK_CACHE && !self.watermark_cache.is_synced() {
if let Some(ref watermark) = self.committed_watermark {
let range: (Bound<Once<Datum>>, Bound<Once<Datum>>) =
(Included(once(Some(watermark.clone()))), Unbounded);
let mut pks = Vec::with_capacity(self.watermark_cache.capacity());
{
let mut streams = vec![];
for vnode in self.vnodes().iter_vnodes() {
let stream = self
.iter_with_vnode(vnode, &range, PrefetchOptions::default())
.await?;
streams.push(Box::pin(stream));
}
let merged_stream = merge_sort(streams);
pin_mut!(merged_stream);
#[for_await]
for entry in merged_stream.take(self.watermark_cache.capacity()) {
let keyed_row = entry?;
let pk = self.pk_serde.deserialize(keyed_row.key())?;
if !pk.is_null_at(0) {
pks.push(pk);
}
}
}
let mut filler = self.watermark_cache.begin_syncing();
for pk in pks {
filler.insert_unchecked(DefaultOrdered(pk), ());
}
filler.finish();
let n_cache_entries = self.watermark_cache.len();
if n_cache_entries < self.watermark_cache.capacity() {
self.watermark_cache.set_table_row_count(n_cache_entries);
}
}
}
Ok(())
}
fn commit_pending_watermark(&mut self) -> Option<(WatermarkDirection, Vec<VnodeWatermark>)> {
let watermark = self.pending_watermark.take();
watermark.as_ref().inspect(|watermark| {
trace!(table_id = %self.table_id, watermark = ?watermark, "state cleaning");
});
let prefix_serializer = if self.pk_indices().is_empty() {
None
} else {
Some(self.pk_serde.prefix(1))
};
let should_clean_watermark = match watermark {
Some(ref watermark) => {
if USE_WATERMARK_CACHE && self.watermark_cache.is_synced() {
if let Some(key) = self.watermark_cache.lowest_key() {
watermark.as_scalar_ref_impl().default_cmp(&key).is_ge()
} else {
false
}
} else {
true
}
}
None => false,
};
let watermark_suffix = watermark.as_ref().map(|watermark| {
serialize_pk(
row::once(Some(watermark.clone())),
prefix_serializer.as_ref().unwrap(),
)
});
let mut seal_watermark: Option<(WatermarkDirection, VnodeWatermark)> = None;
if should_clean_watermark && let Some(watermark_suffix) = watermark_suffix {
trace!(table_id = %self.table_id, watermark = ?watermark_suffix, vnodes = ?{
self.vnodes().iter_vnodes().collect_vec()
}, "delete range");
if prefix_serializer
.as_ref()
.unwrap()
.get_order_types()
.first()
.unwrap()
.is_ascending()
{
seal_watermark = Some((
WatermarkDirection::Ascending,
VnodeWatermark::new(
self.vnodes().clone(),
Bytes::copy_from_slice(watermark_suffix.as_ref()),
),
));
} else {
seal_watermark = Some((
WatermarkDirection::Descending,
VnodeWatermark::new(
self.vnodes().clone(),
Bytes::copy_from_slice(watermark_suffix.as_ref()),
),
));
}
}
self.committed_watermark = watermark;
if USE_WATERMARK_CACHE && seal_watermark.is_some() {
self.watermark_cache.clear();
}
seal_watermark.map(|(direction, watermark)| (direction, vec![watermark]))
}
pub async fn try_flush(&mut self) -> StreamExecutorResult<()> {
self.local_store.try_flush().await?;
Ok(())
}
}
pub trait KeyedRowStream<'a>: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a {}
impl<'a, T> KeyedRowStream<'a> for T where
T: Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + 'a
{
}
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn iter_with_vnode(
&self,
vnode: VirtualNode,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
Ok(deserialize_keyed_row_stream(
self.iter_kv_with_pk_range(pk_range, vnode, prefetch_options)
.await?,
&self.row_serde,
))
}
pub async fn iter_with_vnode_and_output_indices(
&self,
vnode: VirtualNode,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<KeyedRow<Bytes>>> + '_> {
assert!(IS_REPLICATED);
let stream = self
.iter_with_vnode(vnode, pk_range, prefetch_options)
.await?;
Ok(stream.map(|row| {
row.map(|keyed_row| {
let (vnode_prefixed_key, row) = keyed_row.into_parts();
let row = row.project(&self.output_indices).into_owned_row();
KeyedRow::new(vnode_prefixed_key, row)
})
}))
}
async fn iter_kv(
&self,
table_key_range: TableKeyRange,
prefix_hint: Option<Bytes>,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
prefetch_options,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
Ok(self.local_store.iter(table_key_range, read_options).await?)
}
async fn rev_iter_kv(
&self,
table_key_range: TableKeyRange,
prefix_hint: Option<Bytes>,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::RevIter<'_>> {
let read_options = ReadOptions {
prefix_hint,
retention_seconds: self.table_option.retention_seconds,
table_id: self.table_id,
prefetch_options,
cache_policy: CachePolicy::Fill(CacheContext::Default),
..Default::default()
};
Ok(self
.local_store
.rev_iter(table_key_range, read_options)
.await?)
}
pub async fn iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
self.iter_with_prefix_inner::<false>(pk_prefix, sub_range, prefetch_options)
.await
}
pub async fn rev_iter_with_prefix(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
self.iter_with_prefix_inner::<true>(pk_prefix, sub_range, prefetch_options)
.await
}
async fn iter_with_prefix_inner<const REVERSE: bool>(
&self,
pk_prefix: impl Row,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<impl KeyedRowStream<'_>> {
let prefix_serializer = self.pk_serde.prefix(pk_prefix.len());
let encoded_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let vnode = self.compute_prefix_vnode(&pk_prefix);
let pk_prefix_indices = &self.pk_indices[..pk_prefix.len()];
if self.prefix_hint_len != 0 {
debug_assert_eq!(self.prefix_hint_len, pk_prefix.len());
}
let prefix_hint = {
if self.prefix_hint_len == 0 || self.prefix_hint_len > pk_prefix.len() {
None
} else {
let encoded_prefix_len = self
.pk_serde
.deserialize_prefix_len(&encoded_prefix, self.prefix_hint_len)?;
Some(Bytes::copy_from_slice(
&encoded_prefix[..encoded_prefix_len],
))
}
};
trace!(
table_id = %self.table_id(),
?prefix_hint, ?pk_prefix,
?pk_prefix_indices,
iter_direction = if REVERSE { "reverse" } else { "forward" },
"storage_iter_with_prefix"
);
let memcomparable_range =
prefix_and_sub_range_to_memcomparable(&self.pk_serde, sub_range, pk_prefix);
let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
Ok(if REVERSE {
futures::future::Either::Left(deserialize_keyed_row_stream(
self.rev_iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
} else {
futures::future::Either::Right(deserialize_keyed_row_stream(
self.iter_kv(
memcomparable_range_with_vnode,
prefix_hint,
prefetch_options,
)
.await?,
&self.row_serde,
))
})
}
async fn iter_kv_with_pk_range(
&self,
pk_range: &(Bound<impl Row>, Bound<impl Row>),
vnode: VirtualNode,
prefetch_options: PrefetchOptions,
) -> StreamExecutorResult<<S::Local as LocalStateStore>::Iter<'_>> {
let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
self.iter_kv(memcomparable_range_with_vnode, None, prefetch_options)
.await
.map_err(StreamExecutorError::from)
}
#[cfg(test)]
pub fn get_watermark_cache(&self) -> &StateTableWatermarkCache {
&self.watermark_cache
}
}
impl<S, SD, const IS_REPLICATED: bool, const USE_WATERMARK_CACHE: bool>
StateTableInner<S, SD, IS_REPLICATED, USE_WATERMARK_CACHE>
where
S: StateStore,
SD: ValueRowSerde,
{
pub async fn iter_log_with_vnode(
&self,
vnode: VirtualNode,
epoch_range: (u64, u64),
pk_range: &(Bound<impl Row>, Bound<impl Row>),
) -> StreamExecutorResult<impl Stream<Item = StreamExecutorResult<ChangeLogRow>> + '_> {
let memcomparable_range = prefix_range_to_memcomparable(&self.pk_serde, pk_range);
let memcomparable_range_with_vnode = prefixed_range_with_vnode(memcomparable_range, vnode);
Ok(deserialize_log_stream(
self.store
.iter_log(
epoch_range,
memcomparable_range_with_vnode,
ReadLogOptions {
table_id: self.table_id,
},
)
.await?,
&self.row_serde,
)
.map_err(Into::into))
}
}
fn deserialize_keyed_row_stream<'a>(
iter: impl StateStoreIter + 'a,
deserializer: &'a impl ValueRowSerde,
) -> impl KeyedRowStream<'a> {
iter.into_stream(move |(key, value)| {
Ok(KeyedRow::new(
key.user_key.table_key.copy_into(),
deserializer.deserialize(value).map(OwnedRow::new)?,
))
})
.map_err(Into::into)
}
pub fn prefix_range_to_memcomparable(
pk_serde: &OrderedRowSerde,
range: &(Bound<impl Row>, Bound<impl Row>),
) -> (Bound<Bytes>, Bound<Bytes>) {
(
start_range_to_memcomparable(pk_serde, &range.0),
end_range_to_memcomparable(pk_serde, &range.1, None),
)
}
fn prefix_and_sub_range_to_memcomparable(
pk_serde: &OrderedRowSerde,
sub_range: &(Bound<impl Row>, Bound<impl Row>),
pk_prefix: impl Row,
) -> (Bound<Bytes>, Bound<Bytes>) {
let (range_start, range_end) = sub_range;
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
let serialized_pk_prefix = serialize_pk(&pk_prefix, &prefix_serializer);
let start_range = match range_start {
Included(start_range) => Bound::Included(Either::Left((&pk_prefix).chain(start_range))),
Excluded(start_range) => Bound::Excluded(Either::Left((&pk_prefix).chain(start_range))),
Unbounded => Bound::Included(Either::Right(&pk_prefix)),
};
let end_range = match range_end {
Included(end_range) => Bound::Included((&pk_prefix).chain(end_range)),
Excluded(end_range) => Bound::Excluded((&pk_prefix).chain(end_range)),
Unbounded => Unbounded,
};
(
start_range_to_memcomparable(pk_serde, &start_range),
end_range_to_memcomparable(pk_serde, &end_range, Some(serialized_pk_prefix)),
)
}
fn start_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
serialize_pk(pk_prefix, &prefix_serializer)
};
match bound {
Unbounded => Unbounded,
Included(r) => {
let serialized = serialize_pk_prefix(r);
Included(serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
start_bound_of_excluded_prefix(&serialized)
}
}
}
fn end_range_to_memcomparable<R: Row>(
pk_serde: &OrderedRowSerde,
bound: &Bound<R>,
serialized_pk_prefix: Option<Bytes>,
) -> Bound<Bytes> {
let serialize_pk_prefix = |pk_prefix: &R| {
let prefix_serializer = pk_serde.prefix(pk_prefix.len());
serialize_pk(pk_prefix, &prefix_serializer)
};
match bound {
Unbounded => match serialized_pk_prefix {
Some(serialized_pk_prefix) => end_bound_of_prefix(&serialized_pk_prefix),
None => Unbounded,
},
Included(r) => {
let serialized = serialize_pk_prefix(r);
end_bound_of_prefix(&serialized)
}
Excluded(r) => {
let serialized = serialize_pk_prefix(r);
Excluded(serialized)
}
}
}
fn fill_non_output_indices(
i2o_mapping: &ColIndexMapping,
data_types: &[DataType],
chunk: StreamChunk,
) -> StreamChunk {
let cardinality = chunk.cardinality();
let (ops, columns, vis) = chunk.into_inner();
let mut full_columns = Vec::with_capacity(data_types.len());
for (i, data_type) in data_types.iter().enumerate() {
if let Some(j) = i2o_mapping.try_map(i) {
full_columns.push(columns[j].clone());
} else {
let mut column_builder = ArrayImplBuilder::with_type(cardinality, data_type.clone());
column_builder.append_n_null(cardinality);
let column: ArrayRef = column_builder.finish().into();
full_columns.push(column)
}
}
let data_chunk = DataChunk::new(full_columns, vis);
StreamChunk::from_parts(ops, data_chunk)
}
#[cfg(test)]
mod tests {
use std::fmt::Debug;
use expect_test::{expect, Expect};
use super::*;
fn check(actual: impl Debug, expect: Expect) {
let actual = format!("{:#?}", actual);
expect.assert_eq(&actual);
}
#[test]
fn test_fill_non_output_indices() {
let data_types = vec![DataType::Int32, DataType::Int32, DataType::Int32];
let replicated_chunk = [OwnedRow::new(vec![
Some(222_i32.into()),
Some(2_i32.into()),
])];
let replicated_chunk = StreamChunk::from_parts(
vec![Op::Insert],
DataChunk::from_rows(&replicated_chunk, &[DataType::Int32, DataType::Int32]),
);
let i2o_mapping = ColIndexMapping::new(vec![Some(1), None, Some(0)], 2);
let filled_chunk = fill_non_output_indices(&i2o_mapping, &data_types, replicated_chunk);
check(
filled_chunk,
expect![[r#"
StreamChunk { cardinality: 1, capacity: 1, data:
+---+---+---+-----+
| + | 2 | | 222 |
+---+---+---+-----+
}"#]],
);
}
}