risingwave_stream/executor/backfill/cdc/
state.rs1use anyhow::anyhow;
16use risingwave_common::id::TableId;
17use risingwave_common::row;
18use risingwave_common::row::{OwnedRow, Row};
19use risingwave_common::types::{Datum, JsonbVal, ScalarImpl};
20use risingwave_common::util::epoch::EpochPair;
21use risingwave_connector::source::cdc::external::CdcOffset;
22use risingwave_storage::StateStore;
23
24use crate::common::table::state_table::StateTable;
25use crate::executor::StreamExecutorResult;
26
27#[derive(Debug, Default)]
28pub struct CdcStateRecord {
29 pub current_pk_pos: Option<OwnedRow>,
30 pub is_finished: bool,
31 pub last_cdc_offset: Option<CdcOffset>,
33 pub row_count: i64,
34}
35
36pub struct CdcBackfillState<S: StateStore> {
38 split_id: String,
40 state_table: StateTable<S>,
41
42 cached_state: Vec<Datum>,
43}
44
45impl<S: StateStore> CdcBackfillState<S> {
46 pub fn new(table_id: TableId, state_table: StateTable<S>, state_len: usize) -> Self {
47 Self {
48 split_id: table_id.to_string(),
49 state_table,
50 cached_state: vec![None; state_len],
51 }
52 }
53
54 pub async fn init_epoch(&mut self, epoch: EpochPair) -> StreamExecutorResult<()> {
55 self.state_table.init_epoch(epoch).await
56 }
57
58 pub async fn restore_state(&mut self) -> StreamExecutorResult<CdcStateRecord> {
60 let key = Some(self.split_id.clone());
61 match self
62 .state_table
63 .get_row(row::once(key.map(ScalarImpl::from)))
64 .await?
65 {
66 Some(row) => {
67 tracing::info!("restored cdc backfill state: {:?}", row);
68 self.cached_state = row.into_inner().into_vec();
69 let state = self.cached_state.as_slice();
70 let state_len = state.len();
71 let cdc_offset = match state[state_len - 1] {
73 Some(ScalarImpl::Jsonb(ref jsonb)) => {
74 serde_json::from_value(jsonb.clone().take()).unwrap()
75 }
76 _ => return Err(anyhow!("invalid backfill state: cdc_offset").into()),
77 };
78 let row_count = match state[state_len - 2] {
79 Some(ScalarImpl::Int64(val)) => val,
80 _ => return Err(anyhow!("invalid backfill state: row_count").into()),
81 };
82 let is_finished = match state[state_len - 3] {
83 Some(ScalarImpl::Bool(val)) => val,
84 _ => return Err(anyhow!("invalid backfill state: backfill_finished").into()),
85 };
86
87 let current_pk_pos = state[1..state_len - 3].to_vec();
88 Ok(CdcStateRecord {
89 current_pk_pos: Some(OwnedRow::new(current_pk_pos)),
90 is_finished,
91 last_cdc_offset: cdc_offset,
92 row_count,
93 })
94 }
95 None => Ok(CdcStateRecord::default()),
96 }
97 }
98
99 pub async fn mutate_state(
101 &mut self,
102 current_pk_pos: Option<OwnedRow>,
103 last_cdc_offset: Option<CdcOffset>,
104 row_count: u64,
105 is_finished: bool,
106 ) -> StreamExecutorResult<()> {
107 let state = self.cached_state.as_mut_slice();
109 let split_id = Some(ScalarImpl::from(self.split_id.clone()));
110 let state_len = state.len();
111 state[0].clone_from(&split_id);
112 if let Some(current_pk_pos) = ¤t_pk_pos {
113 state[1..=current_pk_pos.len()].clone_from_slice(current_pk_pos.as_inner());
114 }
115 state[state_len - 3] = Some(is_finished.into());
116 state[state_len - 2] = Some((row_count as i64).into());
117 state[state_len - 1] = last_cdc_offset.clone().map(|cdc_offset| {
118 let json = serde_json::to_value(cdc_offset).unwrap();
119 ScalarImpl::Jsonb(JsonbVal::from(json))
120 });
121
122 match self.state_table.get_row(row::once(split_id)).await? {
123 Some(prev_row) => {
124 self.state_table
125 .update(prev_row, self.cached_state.as_slice());
126 }
127 None => {
128 self.state_table.insert(self.cached_state.as_slice());
129 }
130 }
131 Ok(())
132 }
133
134 pub async fn commit_state(&mut self, new_epoch: EpochPair) -> StreamExecutorResult<()> {
136 self.state_table
137 .commit_assert_no_update_vnode_bitmap(new_epoch)
138 .await
139 }
140}