risingwave_stream/executor/backfill/cdc/
state_v2.rs1use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::types::{JsonbVal, ScalarImpl};
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_connector::source::cdc::external::CdcOffset;
20use risingwave_storage::StateStore;
21
22use crate::common::table::state_table::StateTable;
23use crate::executor::StreamExecutorResult;
24
25#[derive(Debug, Default)]
26pub struct CdcStateRecord {
27 pub is_finished: bool,
28 #[expect(dead_code)]
29 pub row_count: i64,
30 pub cdc_offset_low: Option<CdcOffset>,
31 pub cdc_offset_high: Option<CdcOffset>,
32}
33
34pub struct ParallelizedCdcBackfillState<S: StateStore> {
37 state_table: StateTable<S>,
38 state_len: usize,
39 is_legacy_state: bool,
40}
41
42impl<S: StateStore> ParallelizedCdcBackfillState<S> {
43 pub fn new(state_table: StateTable<S>) -> Self {
44 let is_legacy_state = state_table.get_data_types().len() == 3;
45 let state_len = if is_legacy_state { 3 } else { 5 };
46 Self {
47 state_table,
48 state_len,
49 is_legacy_state,
50 }
51 }
52
53 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
54 self.state_table.init_epoch(epoch).await
55 }
56
57 pub async fn restore_state(&mut self, split_id: i64) -> StreamExecutorResult<CdcStateRecord> {
59 let key = Some(split_id);
60 match self
61 .state_table
62 .get_row(row::once(key.map(ScalarImpl::from)))
63 .await?
64 {
65 Some(row) => {
66 tracing::info!("restored cdc backfill state: {:?}", row);
67 let state = row.into_inner().into_vec();
68 let is_finished = match state[1] {
69 Some(ScalarImpl::Bool(val)) => val,
70 _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
71 };
72 let row_count = match state[2] {
73 Some(ScalarImpl::Int64(val)) => val,
74 _ => return Err(anyhow!("invalid backfill state: row_count").into()),
75 };
76 let (cdc_offset_low, cdc_offset_high) = if !self.is_legacy_state {
77 let cdc_offset_low = match state[3] {
78 Some(ScalarImpl::Jsonb(ref jsonb)) => {
79 serde_json::from_value(jsonb.clone().take()).unwrap()
80 }
81 None => None,
82 _ => return Err(anyhow!("invalid backfill state: cdc_offset_low").into()),
83 };
84 let cdc_offset_high = match state[4] {
85 Some(ScalarImpl::Jsonb(ref jsonb)) => {
86 serde_json::from_value(jsonb.clone().take()).unwrap()
87 }
88 None => None,
89 _ => return Err(anyhow!("invalid backfill state: cdc_offset_high").into()),
90 };
91 (cdc_offset_low, cdc_offset_high)
92 } else {
93 (None, None)
94 };
95
96 Ok(CdcStateRecord {
97 is_finished,
98 row_count,
99 cdc_offset_low,
100 cdc_offset_high,
101 })
102 }
103 None => Ok(CdcStateRecord::default()),
104 }
105 }
106
107 pub async fn mutate_state(
109 &mut self,
110 split_id: i64,
111 is_finished: bool,
112 row_count: u64,
113 cdc_offset_low: Option<CdcOffset>,
114 cdc_offset_high: Option<CdcOffset>,
115 ) -> StreamExecutorResult<()> {
116 let mut state = vec![None; self.state_len];
118 let split_id = Some(ScalarImpl::from(split_id));
119 state[0].clone_from(&split_id);
120 state[1] = Some(is_finished.into());
121 state[2] = Some((row_count as i64).into());
122 if !self.is_legacy_state {
123 state[3] = cdc_offset_low.map(|cdc_offset| {
124 let json = serde_json::to_value(cdc_offset).unwrap();
125 ScalarImpl::Jsonb(JsonbVal::from(json))
126 });
127 state[4] = cdc_offset_high.map(|cdc_offset| {
128 let json = serde_json::to_value(cdc_offset).unwrap();
129 ScalarImpl::Jsonb(JsonbVal::from(json))
130 });
131 }
132
133 match self.state_table.get_row(row::once(split_id)).await? {
134 Some(prev_row) => {
135 self.state_table.update(prev_row, state.as_slice());
136 }
137 None => {
138 self.state_table.insert(state.as_slice());
139 }
140 }
141 Ok(())
142 }
143
144 pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
146 self.state_table
147 .commit_assert_no_update_vnode_bitmap(new_epoch)
148 .await
149 }
150
151 pub fn is_legacy_state(&self) -> bool {
152 self.is_legacy_state
153 }
154}