risingwave_stream/executor/source/
source_backfill_state_table.rs1use std::ops::Bound;
16
17use futures::{StreamExt, pin_mut};
18use risingwave_common::row;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_connector::source::SplitId;
23use risingwave_pb::catalog::PbTable;
24use risingwave_storage::StateStore;
25
26use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates};
27use crate::common::table::state_table::StateTable;
28use crate::executor::StreamExecutorResult;
29
30pub struct BackfillStateTableHandler<S: StateStore> {
31 state_store: StateTable<S>,
32}
33
34impl<S: StateStore> BackfillStateTableHandler<S> {
35 pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
37 Self {
38 state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
39 }
40 }
41
42 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
43 self.state_store.init_epoch(epoch).await
44 }
45
46 fn string_to_scalar(rhs: impl Into<String>) -> ScalarImpl {
47 ScalarImpl::Utf8(rhs.into().into_boxed_str())
48 }
49
50 pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult<Option<OwnedRow>> {
51 self.state_store
52 .get_row(row::once(Some(Self::string_to_scalar(key.as_ref()))))
53 .await
54 }
55
56 pub async fn scan_may_stale(&self) -> StreamExecutorResult<Vec<BackfillStateWithProgress>> {
58 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
59
60 let state_table_iter = self
61 .state_store
62 .iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
63 .await?;
64 pin_mut!(state_table_iter);
65
66 let mut ret = vec![];
67 while let Some(item) = state_table_iter.next().await {
68 let row = item?.into_owned_row();
69 let state = match row.datum_at(1) {
70 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
71 BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())?
72 }
73 _ => unreachable!(),
74 };
75 ret.push(state);
76 }
77 tracing::trace!("scan SourceBackfill state table: {:?}", ret);
78 Ok(ret)
79 }
80
81 async fn set(
82 &mut self,
83 key: SplitId,
84 state: BackfillStateWithProgress,
85 ) -> StreamExecutorResult<()> {
86 let row = [
87 Some(Self::string_to_scalar(key.as_ref())),
88 Some(ScalarImpl::Jsonb(state.encode_to_json())),
89 ];
90 match self.get(&key).await? {
91 Some(prev_row) => {
92 self.state_store.update(prev_row, row);
93 }
94 None => {
95 self.state_store.insert(row);
96 }
97 }
98 Ok(())
99 }
100
101 pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> {
102 if let Some(prev_row) = self.get(key).await? {
103 self.state_store.delete(prev_row);
104 }
105
106 Ok(())
107 }
108
109 pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
110 for (split_id, state) in states {
111 self.set(split_id, state).await?;
112 }
113 Ok(())
114 }
115
116 pub async fn trim_state(
117 &mut self,
118 to_trim: impl IntoIterator<Item = SplitId>,
119 ) -> StreamExecutorResult<()> {
120 for split_id in to_trim {
121 tracing::info!("trimming source state for split {}", split_id);
122 self.delete(&split_id).await?;
123 }
124
125 Ok(())
126 }
127
128 pub(super) fn state_store(&self) -> &StateTable<S> {
129 &self.state_store
130 }
131
132 pub(super) async fn commit(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
133 self.state_store
134 .commit_assert_no_update_vnode_bitmap(epoch)
135 .await?;
136 Ok(())
137 }
138
139 pub(super) async fn new_committed_reader(
147 &self,
148 epoch: EpochPair,
149 ) -> StreamExecutorResult<BackfillStateTableCommittedReader<'_, S>> {
150 self.state_store
151 .try_wait_committed_epoch(epoch.prev)
152 .await?;
153 Ok(BackfillStateTableCommittedReader { handle: self })
154 }
155}
156
157pub(super) struct BackfillStateTableCommittedReader<'a, S: StateStore> {
158 handle: &'a BackfillStateTableHandler<S>,
159}
160
161impl<S: StateStore> BackfillStateTableCommittedReader<'_, S> {
162 pub(super) async fn try_recover_from_state_store(
163 &self,
164 split_id: &SplitId,
165 ) -> StreamExecutorResult<Option<BackfillStateWithProgress>> {
166 Ok(self
167 .handle
168 .get(split_id)
169 .await?
170 .map(|row| match row.datum_at(1) {
171 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
172 BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())
173 }
174 _ => unreachable!(),
175 })
176 .transpose()?)
177 }
178}