risingwave_stream/executor/source/
source_backfill_state_table.rsuse std::ops::Bound;
use futures::{pin_mut, StreamExt};
use risingwave_common::row;
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
use risingwave_common::util::epoch::EpochPair;
use risingwave_connector::source::SplitId;
use risingwave_pb::catalog::PbTable;
use risingwave_storage::StateStore;
use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates};
use crate::common::table::state_table::StateTable;
use crate::executor::error::StreamExecutorError;
use crate::executor::StreamExecutorResult;
pub struct BackfillStateTableHandler<S: StateStore> {
pub state_store: StateTable<S>,
}
impl<S: StateStore> BackfillStateTableHandler<S> {
pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
Self {
state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
}
}
pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
self.state_store.init_epoch(epoch).await
}
fn string_to_scalar(rhs: impl Into<String>) -> ScalarImpl {
ScalarImpl::Utf8(rhs.into().into_boxed_str())
}
pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult<Option<OwnedRow>> {
self.state_store
.get_row(row::once(Some(Self::string_to_scalar(key.as_ref()))))
.await
.map_err(StreamExecutorError::from)
}
pub async fn scan(&self) -> StreamExecutorResult<Vec<BackfillStateWithProgress>> {
let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
let state_table_iter = self
.state_store
.iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
.await?;
pin_mut!(state_table_iter);
let mut ret = vec![];
while let Some(item) = state_table_iter.next().await {
let row = item?.into_owned_row();
let state = match row.datum_at(1) {
Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())?
}
_ => unreachable!(),
};
ret.push(state);
}
tracing::trace!("scan SourceBackfill state table: {:?}", ret);
Ok(ret)
}
async fn set(
&mut self,
key: SplitId,
state: BackfillStateWithProgress,
) -> StreamExecutorResult<()> {
let row = [
Some(Self::string_to_scalar(key.as_ref())),
Some(ScalarImpl::Jsonb(state.encode_to_json())),
];
match self.get(&key).await? {
Some(prev_row) => {
self.state_store.update(prev_row, row);
}
None => {
self.state_store.insert(row);
}
}
Ok(())
}
pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> {
if let Some(prev_row) = self.get(key).await? {
self.state_store.delete(prev_row);
}
Ok(())
}
pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
for (split_id, state) in states {
self.set(split_id, state).await?;
}
Ok(())
}
pub async fn trim_state(
&mut self,
to_trim: impl IntoIterator<Item = SplitId>,
) -> StreamExecutorResult<()> {
for split_id in to_trim {
tracing::info!("trimming source state for split {}", split_id);
self.delete(&split_id).await?;
}
Ok(())
}
pub async fn try_recover_from_state_store(
&mut self,
split_id: &SplitId,
) -> StreamExecutorResult<Option<BackfillStateWithProgress>> {
Ok(self
.get(split_id)
.await?
.map(|row| match row.datum_at(1) {
Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())
}
_ => unreachable!(),
})
.transpose()?)
}
}