1use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::id::JobId;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_connector::source::CdcTableSnapshotSplitRaw;
21use risingwave_connector::source::cdc::external::CDC_TABLE_SPLIT_ID_START;
22use risingwave_connector::source::cdc::{
23 INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID, INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
24};
25use risingwave_meta_model::{FragmentId, cdc_table_snapshot_split};
26use risingwave_pb::id::ActorId;
27use risingwave_pb::source::PbCdcTableSnapshotSplits;
28use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
29use sea_orm::prelude::Expr;
30use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
31
32use crate::MetaResult;
33use crate::stream::cdc::{
34 CdcTableSnapshotSplits, assign_cdc_table_snapshot_splits, single_merged_split,
35};
36
37#[derive(Debug)]
38enum CdcBackfillStatus {
39 Backfilling(CdcBackfillProgress),
40 PreCompleted,
41 Completed,
42}
43
44#[derive(Debug)]
45struct CdcBackfillProgress {
46 splits: Vec<CdcTableSnapshotSplitRaw>,
47 split_backfilled_count: u64,
49 split_completed_count: u64,
51 split_assignment_generation: u64,
53}
54
55#[derive(Debug)]
56pub struct CdcProgress {
57 pub split_total_count: u64,
59 pub split_backfilled_count: u64,
61 pub split_completed_count: u64,
63}
64
65impl CdcTableBackfillTracker {
66 pub async fn mark_complete_job(txn: &impl ConnectionTrait, job_id: JobId) -> MetaResult<()> {
67 let bound = OwnedRow::new(vec![None]).value_serialize();
69 cdc_table_snapshot_split::Entity::update_many()
70 .col_expr(
71 cdc_table_snapshot_split::Column::IsBackfillFinished,
72 Expr::value(1),
73 )
74 .col_expr(
75 cdc_table_snapshot_split::Column::Left,
76 Expr::value(bound.clone()),
77 )
78 .col_expr(cdc_table_snapshot_split::Column::Right, Expr::value(bound))
79 .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
80 .filter(cdc_table_snapshot_split::Column::SplitId.eq(CDC_TABLE_SPLIT_ID_START))
81 .exec(txn)
82 .await?;
83 cdc_table_snapshot_split::Entity::delete_many()
85 .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
86 .filter(cdc_table_snapshot_split::Column::SplitId.gt(CDC_TABLE_SPLIT_ID_START))
87 .exec(txn)
88 .await?;
89 Ok(())
90 }
91}
92
93#[derive(Debug)]
94pub(super) struct CdcTableBackfillTracker {
95 status: CdcBackfillStatus,
96 cdc_scan_fragment_id: FragmentId,
97 next_generation: u64,
98}
99
100impl CdcTableBackfillTracker {
101 fn new_inner(cdc_scan_fragment_id: FragmentId, splits: CdcTableSnapshotSplits) -> Self {
102 let status = match splits {
103 CdcTableSnapshotSplits::Backfilling(splits) => {
104 CdcBackfillStatus::Backfilling(CdcBackfillProgress {
105 splits,
106 split_backfilled_count: 0,
107 split_completed_count: 0,
108 split_assignment_generation: INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
109 })
110 }
111 CdcTableSnapshotSplits::Finished => CdcBackfillStatus::Completed,
112 };
113 Self {
114 status,
115 cdc_scan_fragment_id,
116 next_generation: INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID + 1,
117 }
118 }
119
120 pub fn restore(cdc_scan_fragment_id: FragmentId, splits: CdcTableSnapshotSplits) -> Self {
121 Self::new_inner(cdc_scan_fragment_id, splits)
122 }
123
124 pub fn new(cdc_scan_fragment_id: FragmentId, splits: Vec<CdcTableSnapshotSplitRaw>) -> Self {
125 Self::new_inner(
126 cdc_scan_fragment_id,
127 CdcTableSnapshotSplits::Backfilling(splits),
128 )
129 }
130
131 pub fn cdc_scan_fragment_id(&self) -> FragmentId {
132 self.cdc_scan_fragment_id
133 }
134
135 pub fn update_split_progress(&mut self, progress: &PbCdcTableBackfillProgress) {
136 tracing::debug!(?progress, "Complete split.");
137 let current_progress = match &mut self.status {
138 CdcBackfillStatus::Backfilling(progress) => progress,
139 CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => {
140 return;
141 }
142 };
143 assert_ne!(
144 progress.generation,
145 INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID
146 );
147 if current_progress.split_assignment_generation == progress.generation {
148 current_progress.split_backfilled_count +=
149 (1 + progress.split_id_end_inclusive - progress.split_id_start_inclusive) as u64;
150 if progress.done {
151 current_progress.split_completed_count += (1 + progress.split_id_end_inclusive
152 - progress.split_id_start_inclusive)
153 as u64;
154 if current_progress.split_completed_count == current_progress.splits.len() as u64 {
155 self.status = CdcBackfillStatus::PreCompleted;
156 }
157 }
158 }
159 }
160
161 pub fn reassign_splits(
162 &mut self,
163 actor_ids: HashSet<ActorId>,
164 ) -> MetaResult<HashMap<ActorId, PbCdcTableSnapshotSplits>> {
165 let generation = self.next_generation;
166 self.next_generation += 1;
167 let splits = match &mut self.status {
168 CdcBackfillStatus::Backfilling(progress) => {
169 progress.split_backfilled_count = 0;
170 progress.split_completed_count = 0;
171 progress.split_assignment_generation = generation;
172 progress.splits.as_slice()
173 }
174 CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => {
175 static SINGLE_SPLIT: LazyLock<CdcTableSnapshotSplitRaw> =
176 LazyLock::new(single_merged_split);
177 core::slice::from_ref(&*SINGLE_SPLIT)
178 }
179 };
180 assign_cdc_table_snapshot_splits(actor_ids, splits, generation)
181 }
182
183 pub fn gen_cdc_progress(&self) -> CdcProgress {
184 match &self.status {
185 CdcBackfillStatus::Backfilling(progress) => CdcProgress {
186 split_total_count: progress.splits.len() as _,
187 split_backfilled_count: progress.split_backfilled_count,
188 split_completed_count: progress.split_completed_count,
189 },
190 CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => CdcProgress {
191 split_total_count: 1,
192 split_backfilled_count: 1,
193 split_completed_count: 1,
194 },
195 }
196 }
197
198 pub fn take_pre_completed(&mut self) -> bool {
199 if let CdcBackfillStatus::PreCompleted = &self.status {
200 self.status = CdcBackfillStatus::Completed;
201 true
202 } else {
203 false
204 }
205 }
206}
207
208#[cfg(test)]
209mod test {
210
211 use risingwave_connector::source::CdcTableSnapshotSplitRaw;
212 use risingwave_pb::stream_service::barrier_complete_response::CdcTableBackfillProgress;
213
214 use crate::barrier::cdc_progress::{
215 CdcBackfillProgress, CdcBackfillStatus, CdcTableBackfillTracker,
216 };
217
218 impl CdcTableBackfillTracker {
219 fn progress(&self) -> &CdcBackfillProgress {
220 if let CdcBackfillStatus::Backfilling(progress) = &self.status {
221 progress
222 } else {
223 unreachable!()
224 }
225 }
226 }
227
228 #[tokio::test]
229 async fn test_generation() {
230 let split_count = 10u64;
231 let mut tracker = CdcTableBackfillTracker::new(
232 233.into(),
233 (0..split_count)
234 .map(|split_id| CdcTableSnapshotSplitRaw {
235 split_id: split_id as _,
236 left_bound_inclusive: vec![],
237 right_bound_exclusive: vec![],
238 })
239 .collect(),
240 );
241 assert_eq!(tracker.next_generation, 2);
242 tracker
243 .reassign_splits([1.into()].into_iter().collect())
244 .unwrap();
245 let generation = tracker.progress().split_assignment_generation;
246 assert_eq!(generation, 2);
247 assert_init_state(&tracker, split_count);
248 let cdc_table_backfill_progress = vec![
249 CdcTableBackfillProgress {
250 done: true,
251 split_id_start_inclusive: 1,
252 split_id_end_inclusive: 2,
253 generation,
254 fragment_id: 12.into(),
255 ..Default::default()
256 },
257 CdcTableBackfillProgress {
258 done: true,
259 split_id_start_inclusive: 5,
260 split_id_end_inclusive: 10,
261 generation,
262 fragment_id: 11.into(),
263 ..Default::default()
264 },
265 ];
266 for progress in &cdc_table_backfill_progress {
267 tracker.update_split_progress(progress);
268 }
269 assert_eq!(tracker.progress().split_completed_count, 8);
270
271 tracker
273 .reassign_splits([1.into()].into_iter().collect())
274 .unwrap();
275 let generation = tracker.progress().split_assignment_generation;
276 assert_eq!(generation, 3);
277 assert_init_state(&tracker, split_count);
278 let cdc_table_backfill_progress = CdcTableBackfillProgress {
279 done: true,
280 split_id_start_inclusive: 3,
281 split_id_end_inclusive: 4,
282 generation: generation - 1,
284 fragment_id: 13.into(),
285 ..Default::default()
286 };
287 tracker.update_split_progress(&cdc_table_backfill_progress);
288 assert_init_state(&tracker, split_count);
289 assert_eq!(tracker.progress().split_completed_count, 0);
290
291 let cdc_table_backfill_progress = [
292 CdcTableBackfillProgress {
293 done: true,
294 split_id_start_inclusive: 1,
295 split_id_end_inclusive: 2,
296 generation,
297 fragment_id: 12.into(),
298 ..Default::default()
299 },
300 CdcTableBackfillProgress {
301 done: true,
302 split_id_start_inclusive: 5,
303 split_id_end_inclusive: 10,
304 generation,
305 fragment_id: 11.into(),
306 ..Default::default()
307 },
308 CdcTableBackfillProgress {
309 done: true,
310 split_id_start_inclusive: 3,
311 split_id_end_inclusive: 4,
312 generation,
313 fragment_id: 13.into(),
314 ..Default::default()
315 },
316 ];
317 for progress in &cdc_table_backfill_progress {
318 tracker.update_split_progress(progress);
319 }
320 assert!(tracker.take_pre_completed());
321 }
322
323 fn assert_init_state(tracker: &CdcTableBackfillTracker, split_count: u64) {
324 let CdcBackfillStatus::Backfilling(progress) = &tracker.status else {
325 unreachable!()
326 };
327 assert_eq!(progress.splits.len() as u64, split_count);
328 assert_eq!(progress.split_completed_count, 0);
329 assert_eq!(progress.split_backfilled_count, 0);
330 }
331}