risingwave_stream/executor/backfill/cdc/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::anyhow;
16use risingwave_common::id::TableId;
17use risingwave_common::row;
18use risingwave_common::row::{OwnedRow, Row};
19use risingwave_common::types::{Datum, JsonbVal, ScalarImpl};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_connector::source::cdc::external::CdcOffset;
22use risingwave_storage::StateStore;
23
24use crate::common::table::state_table::StateTable;
25use crate::executor::StreamExecutorResult;
26
27#[derive(Debug, Default)]
28pub struct CdcStateRecord {
29    pub current_pk_pos: Option<OwnedRow>,
30    pub is_finished: bool,
31    /// The last cdc offset that has been consumed by the cdc backfill executor
32    pub last_cdc_offset: Option<CdcOffset>,
33    pub row_count: i64,
34}
35
36/// state schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
37pub struct CdcBackfillState<S: StateStore> {
38    /// Id of the backfilling table, will be the key of the state
39    split_id: String,
40    state_table: StateTable<S>,
41
42    cached_state: Vec<Datum>,
43}
44
45impl<S: StateStore> CdcBackfillState<S> {
46    pub fn new(table_id: TableId, state_table: StateTable<S>, state_len: usize) -> Self {
47        Self {
48            split_id: table_id.to_string(),
49            state_table,
50            cached_state: vec![None; state_len],
51        }
52    }
53
54    pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
55        self.state_table.init_epoch(epoch).await
56    }
57
58    /// Restore the backfill state from storage
59    pub async fn restore_state(&mut self) -> StreamExecutorResult<CdcStateRecord> {
60        let key = Some(self.split_id.clone());
61        match self
62            .state_table
63            .get_row(row::once(key.map(ScalarImpl::from)))
64            .await?
65        {
66            Some(row) => {
67                tracing::info!("restored cdc backfill state: {:?}", row);
68                self.cached_state = row.into_inner().into_vec();
69                let state = self.cached_state.as_slice();
70                let state_len = state.len();
71                // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
72                let cdc_offset = match state[state_len - 1] {
73                    Some(ScalarImpl::Jsonb(ref jsonb)) => {
74                        serde_json::from_value(jsonb.clone().take()).unwrap()
75                    }
76                    _ => return Err(anyhow!("invalid backfill state: cdc_offset").into()),
77                };
78                let row_count = match state[state_len - 2] {
79                    Some(ScalarImpl::Int64(val)) => val,
80                    _ => return Err(anyhow!("invalid backfill state: row_count").into()),
81                };
82                let is_finished = match state[state_len - 3] {
83                    Some(ScalarImpl::Bool(val)) => val,
84                    _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
85                };
86
87                let current_pk_pos = state[1..state_len - 3].to_vec();
88                Ok(CdcStateRecord {
89                    current_pk_pos: Some(OwnedRow::new(current_pk_pos)),
90                    is_finished,
91                    last_cdc_offset: cdc_offset,
92                    row_count,
93                })
94            }
95            None => Ok(CdcStateRecord::default()),
96        }
97    }
98
99    /// Modify the state of the corresponding split (currently only supports single split)
100    pub async fn mutate_state(
101        &mut self,
102        current_pk_pos: Option<OwnedRow>,
103        last_cdc_offset: Option<CdcOffset>,
104        row_count: u64,
105        is_finished: bool,
106    ) -> StreamExecutorResult<()> {
107        // schema: | `split_id` | `pk...` | `backfill_finished` | `row_count` | `cdc_offset` |
108        let state = self.cached_state.as_mut_slice();
109        let split_id = Some(ScalarImpl::from(self.split_id.clone()));
110        let state_len = state.len();
111        state[0].clone_from(&split_id);
112        if let Some(current_pk_pos) = &current_pk_pos {
113            state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner());
114        }
115        state[state_len - 3] = Some(is_finished.into());
116        state[state_len - 2] = Some((row_count as i64).into());
117        state[state_len - 1] = last_cdc_offset.clone().map(|cdc_offset| {
118            let json = serde_json::to_value(cdc_offset).unwrap();
119            ScalarImpl::Jsonb(JsonbVal::from(json))
120        });
121
122        match self.state_table.get_row(row::once(split_id)).await? {
123            Some(prev_row) => {
124                self.state_table
125                    .update(prev_row, self.cached_state.as_slice());
126            }
127            None => {
128                self.state_table.insert(self.cached_state.as_slice());
129            }
130        }
131        Ok(())
132    }
133
134    /// Persist the state to storage
135    pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
136        self.state_table
137            .commit_assert_no_update_vnode_bitmap(new_epoch)
138            .await
139    }
140}