risingwave_stream/executor/backfill/cdc/
state_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::types::ScalarImpl;
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_storage::StateStore;
20
21use crate::common::table::state_table::StateTable;
22use crate::executor::StreamExecutorResult;
23
24#[derive(Debug, Default)]
25pub struct CdcStateRecord {
26    pub is_finished: bool,
27    #[expect(dead_code)]
28    pub row_count: i64,
29}
30
31/// state schema: | `split_id` | `backfill_finished` | `row_count` |
32pub struct ParallelizedCdcBackfillState<S: StateStore> {
33    state_table: StateTable<S>,
34    state_len: usize,
35}
36
37impl<S: StateStore> ParallelizedCdcBackfillState<S> {
38    pub fn new(state_table: StateTable<S>, state_len: usize) -> Self {
39        Self {
40            state_table,
41            state_len,
42        }
43    }
44
45    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
46        self.state_table.init_epoch(epoch).await
47    }
48
49    /// Restore the backfill state from storage
50    pub async fn restore_state(&mut self, split_id: i64) -> StreamExecutorResult<CdcStateRecord> {
51        let key = Some(split_id);
52        match self
53            .state_table
54            .get_row(row::once(key.map(ScalarImpl::from)))
55            .await?
56        {
57            Some(row) => {
58                tracing::info!("restored cdc backfill state: {:?}", row);
59                let state = row.into_inner().into_vec();
60                let is_finished = match state[1] {
61                    Some(ScalarImpl::Bool(val)) => val,
62                    _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
63                };
64                let row_count = match state[2] {
65                    Some(ScalarImpl::Int64(val)) => val,
66                    _ => return Err(anyhow!("invalid backfill state: row_count").into()),
67                };
68
69                Ok(CdcStateRecord {
70                    is_finished,
71                    row_count,
72                })
73            }
74            None => Ok(CdcStateRecord::default()),
75        }
76    }
77
78    /// Modify the state of the corresponding split
79    pub async fn mutate_state(
80        &mut self,
81        split_id: i64,
82        is_finished: bool,
83        row_count: u64,
84    ) -> StreamExecutorResult<()> {
85        // schema: | `split_id` | `backfill_finished` | `row_count` |
86        let mut state = vec![None; self.state_len];
87        let split_id = Some(ScalarImpl::from(split_id));
88        state[0].clone_from(&split_id);
89        state[1] = Some(is_finished.into());
90        state[2] = Some((row_count as i64).into());
91        match self.state_table.get_row(row::once(split_id)).await? {
92            Some(prev_row) => {
93                self.state_table.update(prev_row, state.as_slice());
94            }
95            None => {
96                self.state_table.insert(state.as_slice());
97            }
98        }
99        Ok(())
100    }
101
102    /// Persist the state to storage
103    pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
104        self.state_table
105            .commit_assert_no_update_vnode_bitmap(new_epoch)
106            .await
107    }
108}