risingwave_stream/executor/source/
source_backfill_state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use futures::{StreamExt, pin_mut};
18use risingwave_common::row;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_connector::source::SplitId;
23use risingwave_pb::catalog::PbTable;
24use risingwave_storage::StateStore;
25
26use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates};
27use crate::common::table::state_table::StateTable;
28use crate::executor::StreamExecutorResult;
29
30pub struct BackfillStateTableHandler<S: StateStore> {
31    state_store: StateTable<S>,
32}
33
34impl<S: StateStore> BackfillStateTableHandler<S> {
35    /// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like.
36    pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
37        Self {
38            state_store: StateTable::from_table_catalog(table_catalog, store, None).await,
39        }
40    }
41
42    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
43        self.state_store.init_epoch(epoch).await
44    }
45
46    fn string_to_scalar(rhs: impl Into<String>) -> ScalarImpl {
47        ScalarImpl::Utf8(rhs.into().into_boxed_str())
48    }
49
50    pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult<Option<OwnedRow>> {
51        self.state_store
52            .get_row(row::once(Some(Self::string_to_scalar(key.as_ref()))))
53            .await
54    }
55
56    /// XXX: we might get stale data for other actors' writes, but it's fine?
57    pub async fn scan_may_stale(&self) -> StreamExecutorResult<Vec<BackfillStateWithProgress>> {
58        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
59
60        let state_table_iter = self
61            .state_store
62            .iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
63            .await?;
64        pin_mut!(state_table_iter);
65
66        let mut ret = vec![];
67        while let Some(item) = state_table_iter.next().await {
68            let row = item?.into_owned_row();
69            let state = match row.datum_at(1) {
70                Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
71                    BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())?
72                }
73                _ => unreachable!(),
74            };
75            ret.push(state);
76        }
77        tracing::trace!("scan SourceBackfill state table: {:?}", ret);
78        Ok(ret)
79    }
80
81    async fn set(
82        &mut self,
83        key: SplitId,
84        state: BackfillStateWithProgress,
85    ) -> StreamExecutorResult<()> {
86        let row = [
87            Some(Self::string_to_scalar(key.as_ref())),
88            Some(ScalarImpl::Jsonb(state.encode_to_json())),
89        ];
90        match self.get(&key).await? {
91            Some(prev_row) => {
92                self.state_store.update(prev_row, row);
93            }
94            None => {
95                self.state_store.insert(row);
96            }
97        }
98        Ok(())
99    }
100
101    pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> {
102        if let Some(prev_row) = self.get(key).await? {
103            self.state_store.delete(prev_row);
104        }
105
106        Ok(())
107    }
108
109    pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
110        for (split_id, state) in states {
111            self.set(split_id, state).await?;
112        }
113        Ok(())
114    }
115
116    pub async fn trim_state(
117        &mut self,
118        to_trim: impl IntoIterator<Item = SplitId>,
119    ) -> StreamExecutorResult<()> {
120        for split_id in to_trim {
121            tracing::info!("trimming source state for split {}", split_id);
122            self.delete(&split_id).await?;
123        }
124
125        Ok(())
126    }
127
128    pub(super) fn state_store(&self) -> &StateTable<S> {
129        &self.state_store
130    }
131
132    pub(super) async fn commit(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
133        self.state_store
134            .commit_assert_no_update_vnode_bitmap(epoch)
135            .await?;
136        Ok(())
137    }
138
139    /// When calling `try_recover_from_state_store`, we may read the state written by other source parallelisms in
140    /// the previous `epoch`. Therefore, we need to explicitly create a `BackfillStateTableCommittedReader` to do
141    /// `try_recover_from_state_store`. Before returning the reader, we will do `try_wait_committed_epoch` to ensure
142    /// that we are able to read all data committed in `epoch`.
143    ///
144    /// Note that, we need to ensure that the barrier of `epoch` must have been yielded before creating the committed reader,
145    /// and otherwise the `try_wait_committed_epoch` will block the barrier of `epoch`, and cause deadlock.
146    pub(super) async fn new_committed_reader(
147        &self,
148        epoch: EpochPair,
149    ) -> StreamExecutorResult<BackfillStateTableCommittedReader<'_, S>> {
150        self.state_store
151            .try_wait_committed_epoch(epoch.prev)
152            .await?;
153        Ok(BackfillStateTableCommittedReader { handle: self })
154    }
155}
156
157pub(super) struct BackfillStateTableCommittedReader<'a, S: StateStore> {
158    handle: &'a BackfillStateTableHandler<S>,
159}
160
161impl<S: StateStore> BackfillStateTableCommittedReader<'_, S> {
162    pub(super) async fn try_recover_from_state_store(
163        &self,
164        split_id: &SplitId,
165    ) -> StreamExecutorResult<Option<BackfillStateWithProgress>> {
166        Ok(self
167            .handle
168            .get(split_id)
169            .await?
170            .map(|row| match row.datum_at(1) {
171                Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
172                    BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())
173                }
174                _ => unreachable!(),
175            })
176            .transpose()?)
177    }
178}