risingwave_stream/executor/backfill/cdc/
state.rs1use anyhow::anyhow;
16use risingwave_common::row;
17use risingwave_common::row::{OwnedRow, Row};
18use risingwave_common::types::{Datum, JsonbVal, ScalarImpl};
19use risingwave_common::util::epoch::EpochPair;
20use risingwave_connector::source::cdc::external::CdcOffset;
21use risingwave_storage::StateStore;
22
23use crate::common::table::state_table::StateTable;
24use crate::executor::StreamExecutorResult;
25
26#[derive(Debug, Default)]
27pub struct CdcStateRecord {
28 pub current_pk_pos: Option<OwnedRow>,
29 pub is_finished: bool,
30 pub last_cdc_offset: Option<CdcOffset>,
32 pub row_count: i64,
33}
34
35pub struct CdcBackfillState<S: StateStore> {
37 split_id: String,
39 state_table: StateTable<S>,
40
41 cached_state: Vec<Datum>,
42}
43
44impl<S: StateStore> CdcBackfillState<S> {
45 pub fn new(table_id: u32, state_table: StateTable<S>, state_len: usize) -> Self {
46 Self {
47 split_id: table_id.to_string(),
48 state_table,
49 cached_state: vec![None; state_len],
50 }
51 }
52
53 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
54 self.state_table.init_epoch(epoch).await
55 }
56
57 pub async fn restore_state(&mut self) -> StreamExecutorResult<CdcStateRecord> {
59 let key = Some(self.split_id.clone());
60 match self
61 .state_table
62 .get_row(row::once(key.map(ScalarImpl::from)))
63 .await?
64 {
65 Some(row) => {
66 tracing::info!("restored cdc backfill state: {:?}", row);
67 self.cached_state = row.into_inner().into_vec();
68 let state = self.cached_state.as_slice();
69 let state_len = state.len();
70 let cdc_offset = match state[state_len - 1] {
72 Some(ScalarImpl::Jsonb(ref jsonb)) => {
73 serde_json::from_value(jsonb.clone().take()).unwrap()
74 }
75 _ => return Err(anyhow!("invalid backfill state: cdc_offset").into()),
76 };
77 let row_count = match state[state_len - 2] {
78 Some(ScalarImpl::Int64(val)) => val,
79 _ => return Err(anyhow!("invalid backfill state: row_count").into()),
80 };
81 let is_finished = match state[state_len - 3] {
82 Some(ScalarImpl::Bool(val)) => val,
83 _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
84 };
85
86 let current_pk_pos = state[1..state_len - 3].to_vec();
87 Ok(CdcStateRecord {
88 current_pk_pos: Some(OwnedRow::new(current_pk_pos)),
89 is_finished,
90 last_cdc_offset: cdc_offset,
91 row_count,
92 })
93 }
94 None => Ok(CdcStateRecord::default()),
95 }
96 }
97
98 pub async fn mutate_state(
100 &mut self,
101 current_pk_pos: Option<OwnedRow>,
102 last_cdc_offset: Option<CdcOffset>,
103 row_count: u64,
104 is_finished: bool,
105 ) -> StreamExecutorResult<()> {
106 let state = self.cached_state.as_mut_slice();
108 let split_id = Some(ScalarImpl::from(self.split_id.clone()));
109 let state_len = state.len();
110 state[0].clone_from(&split_id);
111 if let Some(current_pk_pos) = ¤t_pk_pos {
112 state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner());
113 }
114 state[state_len - 3] = Some(is_finished.into());
115 state[state_len - 2] = Some((row_count as i64).into());
116 state[state_len - 1] = last_cdc_offset.clone().map(|cdc_offset| {
117 let json = serde_json::to_value(cdc_offset).unwrap();
118 ScalarImpl::Jsonb(JsonbVal::from(json))
119 });
120
121 match self.state_table.get_row(row::once(split_id)).await? {
122 Some(prev_row) => {
123 self.state_table
124 .update(prev_row, self.cached_state.as_slice());
125 }
126 None => {
127 self.state_table.insert(self.cached_state.as_slice());
128 }
129 }
130 Ok(())
131 }
132
133 pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
135 self.state_table
136 .commit_assert_no_update_vnode_bitmap(new_epoch)
137 .await
138 }
139}