risingwave_stream/executor/backfill/cdc/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::row::{OwnedRow, Row};
18use risingwave_common::types::{Datum, JsonbVal, ScalarImpl};
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_connector::source::cdc::external::CdcOffset;
21use risingwave_storage::StateStore;
22
23use crate::common::table::state_table::StateTable;
24use crate::executor::StreamExecutorResult;
25
26#[derive(Debug, Default)]
27pub struct CdcStateRecord {
28    pub current_pk_pos: Option<OwnedRow>,
29    pub is_finished: bool,
30    /// The last cdc offset that has been consumed by the cdc backfill executor
31    pub last_cdc_offset: Option<CdcOffset>,
32    pub row_count: i64,
33}
34
35/// state schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
36pub struct CdcBackfillState<S: StateStore> {
37    /// Id of the backfilling table, will be the key of the state
38    split_id: String,
39    state_table: StateTable<S>,
40
41    cached_state: Vec<Datum>,
42}
43
44impl<S: StateStore> CdcBackfillState<S> {
45    pub fn new(table_id: u32, state_table: StateTable<S>, state_len: usize) -> Self {
46        Self {
47            split_id: table_id.to_string(),
48            state_table,
49            cached_state: vec![None; state_len],
50        }
51    }
52
53    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
54        self.state_table.init_epoch(epoch).await
55    }
56
57    /// Restore the backfill state from storage
58    pub async fn restore_state(&mut self) -> StreamExecutorResult<CdcStateRecord> {
59        let key = Some(self.split_id.clone());
60        match self
61            .state_table
62            .get_row(row::once(key.map(ScalarImpl::from)))
63            .await?
64        {
65            Some(row) => {
66                tracing::info!("restored cdc backfill state: {:?}", row);
67                self.cached_state = row.into_inner().into_vec();
68                let state = self.cached_state.as_slice();
69                let state_len = state.len();
70                // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
71                let cdc_offset = match state[state_len - 1] {
72                    Some(ScalarImpl::Jsonb(ref jsonb)) => {
73                        serde_json::from_value(jsonb.clone().take()).unwrap()
74                    }
75                    _ => return Err(anyhow!("invalid backfill state: cdc_offset").into()),
76                };
77                let row_count = match state[state_len - 2] {
78                    Some(ScalarImpl::Int64(val)) => val,
79                    _ => return Err(anyhow!("invalid backfill state: row_count").into()),
80                };
81                let is_finished = match state[state_len - 3] {
82                    Some(ScalarImpl::Bool(val)) => val,
83                    _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
84                };
85
86                let current_pk_pos = state[1..state_len - 3].to_vec();
87                Ok(CdcStateRecord {
88                    current_pk_pos: Some(OwnedRow::new(current_pk_pos)),
89                    is_finished,
90                    last_cdc_offset: cdc_offset,
91                    row_count,
92                })
93            }
94            None => Ok(CdcStateRecord::default()),
95        }
96    }
97
98    /// Modify the state of the corresponding split (currently only supports single split)
99    pub async fn mutate_state(
100        &mut self,
101        current_pk_pos: Option<OwnedRow>,
102        last_cdc_offset: Option<CdcOffset>,
103        row_count: u64,
104        is_finished: bool,
105    ) -> StreamExecutorResult<()> {
106        // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
107        let state = self.cached_state.as_mut_slice();
108        let split_id = Some(ScalarImpl::from(self.split_id.clone()));
109        let state_len = state.len();
110        state[0].clone_from(&split_id);
111        if let Some(current_pk_pos) = &current_pk_pos {
112            state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner());
113        }
114        state[state_len - 3] = Some(is_finished.into());
115        state[state_len - 2] = Some((row_count as i64).into());
116        state[state_len - 1] = last_cdc_offset.clone().map(|cdc_offset| {
117            let json = serde_json::to_value(cdc_offset).unwrap();
118            ScalarImpl::Jsonb(JsonbVal::from(json))
119        });
120
121        match self.state_table.get_row(row::once(split_id)).await? {
122            Some(prev_row) => {
123                self.state_table
124                    .update(prev_row, self.cached_state.as_slice());
125            }
126            None => {
127                self.state_table.insert(self.cached_state.as_slice());
128            }
129        }
130        Ok(())
131    }
132
133    /// Persist the state to storage
134    pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
135        self.state_table
136            .commit_assert_no_update_vnode_bitmap(new_epoch)
137            .await
138    }
139}