risingwave_stream/executor/backfill/cdc/
state_v2.rs1use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::types::ScalarImpl;
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_storage::StateStore;
20
21use crate::common::table::state_table::StateTable;
22use crate::executor::StreamExecutorResult;
23
24#[derive(Debug, Default)]
25pub struct CdcStateRecord {
26 pub is_finished: bool,
27 #[expect(dead_code)]
28 pub row_count: i64,
29}
30
31pub struct ParallelizedCdcBackfillState<S: StateStore> {
33 state_table: StateTable<S>,
34 state_len: usize,
35}
36
37impl<S: StateStore> ParallelizedCdcBackfillState<S> {
38 pub fn new(state_table: StateTable<S>, state_len: usize) -> Self {
39 Self {
40 state_table,
41 state_len,
42 }
43 }
44
45 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
46 self.state_table.init_epoch(epoch).await
47 }
48
49 pub async fn restore_state(&mut self, split_id: i64) -> StreamExecutorResult<CdcStateRecord> {
51 let key = Some(split_id);
52 match self
53 .state_table
54 .get_row(row::once(key.map(ScalarImpl::from)))
55 .await?
56 {
57 Some(row) => {
58 tracing::info!("restored cdc backfill state: {:?}", row);
59 let state = row.into_inner().into_vec();
60 let is_finished = match state[1] {
61 Some(ScalarImpl::Bool(val)) => val,
62 _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
63 };
64 let row_count = match state[2] {
65 Some(ScalarImpl::Int64(val)) => val,
66 _ => return Err(anyhow!("invalid backfill state: row_count").into()),
67 };
68
69 Ok(CdcStateRecord {
70 is_finished,
71 row_count,
72 })
73 }
74 None => Ok(CdcStateRecord::default()),
75 }
76 }
77
78 pub async fn mutate_state(
80 &mut self,
81 split_id: i64,
82 is_finished: bool,
83 row_count: u64,
84 ) -> StreamExecutorResult<()> {
85 let mut state = vec![None; self.state_len];
87 let split_id = Some(ScalarImpl::from(split_id));
88 state[0].clone_from(&split_id);
89 state[1] = Some(is_finished.into());
90 state[2] = Some((row_count as i64).into());
91 match self.state_table.get_row(row::once(split_id)).await? {
92 Some(prev_row) => {
93 self.state_table.update(prev_row, state.as_slice());
94 }
95 None => {
96 self.state_table.insert(state.as_slice());
97 }
98 }
99 Ok(())
100 }
101
102 pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
104 self.state_table
105 .commit_assert_no_update_vnode_bitmap(new_epoch)
106 .await
107 }
108}