risingwave_stream/executor/backfill/cdc/
state_v2.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::types::{JsonbVal, ScalarImpl};
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_connector::source::cdc::external::CdcOffset;
20use risingwave_storage::StateStore;
21
22use crate::common::table::state_table::StateTable;
23use crate::executor::StreamExecutorResult;
24
25#[derive(Debug, Default)]
26pub struct CdcStateRecord {
27    pub is_finished: bool,
28    #[expect(dead_code)]
29    pub row_count: i64,
30    pub cdc_offset_low: Option<CdcOffset>,
31    pub cdc_offset_high: Option<CdcOffset>,
32}
33
34/// state schema: | `split_id` | `backfill_finished` | `row_count` | `cdc_offset_low` | `cdc_offset_high` |
35/// legacy state schema: | `split_id` | `backfill_finished` | `row_count` |
36pub struct ParallelizedCdcBackfillState<S: StateStore> {
37    state_table: StateTable<S>,
38    state_len: usize,
39    is_legacy_state: bool,
40}
41
42impl<S: StateStore> ParallelizedCdcBackfillState<S> {
43    pub fn new(state_table: StateTable<S>) -> Self {
44        let is_legacy_state = state_table.get_data_types().len() == 3;
45        let state_len = if is_legacy_state { 3 } else { 5 };
46        Self {
47            state_table,
48            state_len,
49            is_legacy_state,
50        }
51    }
52
53    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
54        self.state_table.init_epoch(epoch).await
55    }
56
57    /// Restore the backfill state from storage
58    pub async fn restore_state(&mut self, split_id: i64) -> StreamExecutorResult<CdcStateRecord> {
59        let key = Some(split_id);
60        match self
61            .state_table
62            .get_row(row::once(key.map(ScalarImpl::from)))
63            .await?
64        {
65            Some(row) => {
66                tracing::info!("restored cdc backfill state: {:?}", row);
67                let state = row.into_inner().into_vec();
68                let is_finished = match state[1] {
69                    Some(ScalarImpl::Bool(val)) => val,
70                    _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
71                };
72                let row_count = match state[2] {
73                    Some(ScalarImpl::Int64(val)) => val,
74                    _ => return Err(anyhow!("invalid backfill state: row_count").into()),
75                };
76                let (cdc_offset_low, cdc_offset_high) = if !self.is_legacy_state {
77                    let cdc_offset_low = match state[3] {
78                        Some(ScalarImpl::Jsonb(ref jsonb)) => {
79                            serde_json::from_value(jsonb.clone().take()).unwrap()
80                        }
81                        None => None,
82                        _ => return Err(anyhow!("invalid backfill state: cdc_offset_low").into()),
83                    };
84                    let cdc_offset_high = match state[4] {
85                        Some(ScalarImpl::Jsonb(ref jsonb)) => {
86                            serde_json::from_value(jsonb.clone().take()).unwrap()
87                        }
88                        None => None,
89                        _ => return Err(anyhow!("invalid backfill state: cdc_offset_high").into()),
90                    };
91                    (cdc_offset_low, cdc_offset_high)
92                } else {
93                    (None, None)
94                };
95
96                Ok(CdcStateRecord {
97                    is_finished,
98                    row_count,
99                    cdc_offset_low,
100                    cdc_offset_high,
101                })
102            }
103            None => Ok(CdcStateRecord::default()),
104        }
105    }
106
107    /// Modify the state of the corresponding split
108    pub async fn mutate_state(
109        &mut self,
110        split_id: i64,
111        is_finished: bool,
112        row_count: u64,
113        cdc_offset_low: Option<CdcOffset>,
114        cdc_offset_high: Option<CdcOffset>,
115    ) -> StreamExecutorResult<()> {
116        // schema: | `split_id` | `backfill_finished` | `row_count` | `cdc_offset_low` | `cdc_offset_high` |
117        let mut state = vec![None; self.state_len];
118        let split_id = Some(ScalarImpl::from(split_id));
119        state[0].clone_from(&split_id);
120        state[1] = Some(is_finished.into());
121        state[2] = Some((row_count as i64).into());
122        if !self.is_legacy_state {
123            state[3] = cdc_offset_low.map(|cdc_offset| {
124                let json = serde_json::to_value(cdc_offset).unwrap();
125                ScalarImpl::Jsonb(JsonbVal::from(json))
126            });
127            state[4] = cdc_offset_high.map(|cdc_offset| {
128                let json = serde_json::to_value(cdc_offset).unwrap();
129                ScalarImpl::Jsonb(JsonbVal::from(json))
130            });
131        }
132
133        match self.state_table.get_row(row::once(split_id)).await? {
134            Some(prev_row) => {
135                self.state_table.update(prev_row, state.as_slice());
136            }
137            None => {
138                self.state_table.insert(state.as_slice());
139            }
140        }
141        Ok(())
142    }
143
144    /// Persist the state to storage
145    pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
146        self.state_table
147            .commit_assert_no_update_vnode_bitmap(new_epoch)
148            .await
149    }
150
151    pub fn is_legacy_state(&self) -> bool {
152        self.is_legacy_state
153    }
154}