risingwave_stream/executor/source/
source_backfill_state_table.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Bound;
16
17use futures::{StreamExt, pin_mut};
18use risingwave_common::row;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_common::types::{ScalarImpl, ScalarRef, ScalarRefImpl};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_connector::source::SplitId;
23use risingwave_pb::catalog::PbTable;
24use risingwave_storage::StateStore;
25
26use super::source_backfill_executor::{BackfillStateWithProgress, BackfillStates};
27use crate::common::table::state_table::{StateTable, StateTableBuilder};
28use crate::executor::StreamExecutorResult;
29
30pub struct BackfillStateTableHandler<S: StateStore> {
31    state_store: StateTable<S>,
32}
33
34impl<S: StateStore> BackfillStateTableHandler<S> {
35    /// See also [`super::SourceStateTableHandler::from_table_catalog`] for how the state table looks like.
36    pub async fn from_table_catalog(table_catalog: &PbTable, store: S) -> Self {
37        Self {
38            // Note: should not enable `preload_all_rows` for `StateTable` of source backfill
39            // because it uses storage to synchronize different parallelisms, which is a special
40            // access pattern that in-mem state table has not supported yet.
41            state_store: StateTableBuilder::new(table_catalog, store, None)
42                .forbid_preload_all_rows()
43                .build()
44                .await,
45        }
46    }
47
48    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
49        self.state_store.init_epoch(epoch).await
50    }
51
52    fn string_to_scalar(rhs: impl Into<String>) -> ScalarImpl {
53        ScalarImpl::Utf8(rhs.into().into_boxed_str())
54    }
55
56    pub(crate) async fn get(&self, key: &SplitId) -> StreamExecutorResult<Option<OwnedRow>> {
57        self.state_store
58            .get_row(row::once(Some(Self::string_to_scalar(key.as_ref()))))
59            .await
60    }
61
62    /// XXX: we might get stale data for other actors' writes, but it's fine?
63    pub async fn scan_may_stale(&self) -> StreamExecutorResult<Vec<BackfillStateWithProgress>> {
64        let sub_range: &(Bound<OwnedRow>, Bound<OwnedRow>) = &(Bound::Unbounded, Bound::Unbounded);
65
66        let state_table_iter = self
67            .state_store
68            .iter_with_prefix(None::<OwnedRow>, sub_range, Default::default())
69            .await?;
70        pin_mut!(state_table_iter);
71
72        let mut ret = vec![];
73        while let Some(item) = state_table_iter.next().await {
74            let row = item?.into_owned_row();
75            let state = match row.datum_at(1) {
76                Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
77                    BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())?
78                }
79                _ => unreachable!(),
80            };
81            ret.push(state);
82        }
83        tracing::trace!("scan SourceBackfill state table: {:?}", ret);
84        Ok(ret)
85    }
86
87    async fn set(
88        &mut self,
89        key: SplitId,
90        state: BackfillStateWithProgress,
91    ) -> StreamExecutorResult<()> {
92        let row = [
93            Some(Self::string_to_scalar(key.as_ref())),
94            Some(ScalarImpl::Jsonb(state.encode_to_json())),
95        ];
96        match self.get(&key).await? {
97            Some(prev_row) => {
98                self.state_store.update(prev_row, row);
99            }
100            None => {
101                self.state_store.insert(row);
102            }
103        }
104        Ok(())
105    }
106
107    pub async fn delete(&mut self, key: &SplitId) -> StreamExecutorResult<()> {
108        if let Some(prev_row) = self.get(key).await? {
109            self.state_store.delete(prev_row);
110        }
111
112        Ok(())
113    }
114
115    pub async fn set_states(&mut self, states: BackfillStates) -> StreamExecutorResult<()> {
116        for (split_id, state) in states {
117            self.set(split_id, state).await?;
118        }
119        Ok(())
120    }
121
122    pub async fn trim_state(
123        &mut self,
124        to_trim: impl IntoIterator<Item = SplitId>,
125    ) -> StreamExecutorResult<()> {
126        for split_id in to_trim {
127            tracing::info!("trimming source state for split {}", split_id);
128            self.delete(&split_id).await?;
129        }
130
131        Ok(())
132    }
133
134    pub(super) fn state_store(&self) -> &StateTable<S> {
135        &self.state_store
136    }
137
138    pub(super) async fn commit(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
139        self.state_store
140            .commit_assert_no_update_vnode_bitmap(epoch)
141            .await?;
142        Ok(())
143    }
144
145    /// When calling `try_recover_from_state_store`, we may read the state written by other source parallelisms in
146    /// the previous `epoch`. Therefore, we need to explicitly create a `BackfillStateTableCommittedReader` to do
147    /// `try_recover_from_state_store`. Before returning the reader, we will do `try_wait_committed_epoch` to ensure
148    /// that we are able to read all data committed in `epoch`.
149    ///
150    /// Note that, we need to ensure that the barrier of `epoch` must have been yielded before creating the committed reader,
151    /// and otherwise the `try_wait_committed_epoch` will block the barrier of `epoch`, and cause deadlock.
152    pub(super) async fn new_committed_reader(
153        &self,
154        epoch: EpochPair,
155    ) -> StreamExecutorResult<BackfillStateTableCommittedReader<'_, S>> {
156        self.state_store
157            .try_wait_committed_epoch(epoch.prev)
158            .await?;
159        Ok(BackfillStateTableCommittedReader { handle: self })
160    }
161}
162
163pub(super) struct BackfillStateTableCommittedReader<'a, S: StateStore> {
164    handle: &'a BackfillStateTableHandler<S>,
165}
166
167impl<S: StateStore> BackfillStateTableCommittedReader<'_, S> {
168    pub(super) async fn try_recover_from_state_store(
169        &self,
170        split_id: &SplitId,
171    ) -> StreamExecutorResult<Option<BackfillStateWithProgress>> {
172        Ok(self
173            .handle
174            .get(split_id)
175            .await?
176            .map(|row| match row.datum_at(1) {
177                Some(ScalarRefImpl::Jsonb(jsonb_ref)) => {
178                    BackfillStateWithProgress::restore_from_json(jsonb_ref.to_owned_scalar())
179                }
180                _ => unreachable!(),
181            })
182            .transpose()?)
183    }
184}