risingwave_stream/executor/source/
source_backfill_state_table.rs1use std::ops::Bound;
16
17use futures::{StreamExt, pin_mut};
18use risingwave_common::row;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_connector::source::SplitId;
23use risingwave_pb::catalog::PbTable;
24use risingwave_storage::StateStore;
25
26use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates};
27use crate::common::table::state_table::{StateTable, StateTableBuilder};
28use crate::executor::StreamExecutorResult;
29
30pub struct BackfillStateTableHandler<S: StateStore> {
31 state_store: StateTable<S>,
32}
33
34impl<S: StateStore> BackfillStateTableHandler<S> {
35 pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
37 Self {
38 state_store: StateTableBuilder::new(table_catalog, store, None)
42 .forbid_preload_all_rows()
43 .build()
44 .await,
45 }
46 }
47
48 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
49 self.state_store.init_epoch(epoch).await
50 }
51
52 fn string_to_scalar(rhs: impl Into<String>) -> ScalarImpl {
53 ScalarImpl::Utf8(rhs.into().into_boxed_str())
54 }
55
56 pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult<Option<OwnedRow>> {
57 self.state_store
58 .get_row(row::once(Some(Self::string_to_scalar(key.as_ref()))))
59 .await
60 }
61
62 pub async fn scan_may_stale(&self) -> StreamExecutorResult<Vec<BackfillStateWithProgress>> {
64 let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
65
66 let state_table_iter = self
67 .state_store
68 .iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
69 .await?;
70 pin_mut!(state_table_iter);
71
72 let mut ret = vec![];
73 while let Some(item) = state_table_iter.next().await {
74 let row = item?.into_owned_row();
75 let state = match row.datum_at(1) {
76 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
77 BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())?
78 }
79 _ => unreachable!(),
80 };
81 ret.push(state);
82 }
83 tracing::trace!("scan SourceBackfill state table: {:?}", ret);
84 Ok(ret)
85 }
86
87 async fn set(
88 &mut self,
89 key: SplitId,
90 state: BackfillStateWithProgress,
91 ) -> StreamExecutorResult<()> {
92 let row = [
93 Some(Self::string_to_scalar(key.as_ref())),
94 Some(ScalarImpl::Jsonb(state.encode_to_json())),
95 ];
96 match self.get(&key).await? {
97 Some(prev_row) => {
98 self.state_store.update(prev_row, row);
99 }
100 None => {
101 self.state_store.insert(row);
102 }
103 }
104 Ok(())
105 }
106
107 pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> {
108 if let Some(prev_row) = self.get(key).await? {
109 self.state_store.delete(prev_row);
110 }
111
112 Ok(())
113 }
114
115 pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
116 for (split_id, state) in states {
117 self.set(split_id, state).await?;
118 }
119 Ok(())
120 }
121
122 pub async fn trim_state(
123 &mut self,
124 to_trim: impl IntoIterator<Item = SplitId>,
125 ) -> StreamExecutorResult<()> {
126 for split_id in to_trim {
127 tracing::info!("trimming source state for split {}", split_id);
128 self.delete(&split_id).await?;
129 }
130
131 Ok(())
132 }
133
134 pub(super) fn state_store(&self) -> &StateTable<S> {
135 &self.state_store
136 }
137
138 pub(super) async fn commit(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
139 self.state_store
140 .commit_assert_no_update_vnode_bitmap(epoch)
141 .await?;
142 Ok(())
143 }
144
145 pub(super) async fn new_committed_reader(
153 &self,
154 epoch: EpochPair,
155 ) -> StreamExecutorResult<BackfillStateTableCommittedReader<'_, S>> {
156 self.state_store
157 .try_wait_committed_epoch(epoch.prev)
158 .await?;
159 Ok(BackfillStateTableCommittedReader { handle: self })
160 }
161}
162
163pub(super) struct BackfillStateTableCommittedReader<'a, S: StateStore> {
164 handle: &'a BackfillStateTableHandler<S>,
165}
166
167impl<S: StateStore> BackfillStateTableCommittedReader<'_, S> {
168 pub(super) async fn try_recover_from_state_store(
169 &self,
170 split_id: &SplitId,
171 ) -> StreamExecutorResult<Option<BackfillStateWithProgress>> {
172 Ok(self
173 .handle
174 .get(split_id)
175 .await?
176 .map(|row| match row.datum_at(1) {
177 Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
178 BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())
179 }
180 _ => unreachable!(),
181 })
182 .transpose()?)
183 }
184}