1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::Mutex;
20use risingwave_common::catalog::{FragmentTypeFlag, TableId};
21use risingwave_common::row::{OwnedRow, Row};
22use risingwave_connector::source::cdc::external::CDC_TABLE_SPLIT_ID_START;
23use risingwave_connector::source::cdc::{
24 INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID, INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
25};
26use risingwave_meta_model::{FragmentId, ObjectId, cdc_table_snapshot_split, fragment};
27use risingwave_pb::stream_service::PbBarrierCompleteResponse;
28use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
29use sea_orm::prelude::Expr;
30use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, TransactionTrait};
31
32use crate::MetaResult;
33use crate::controller::SqlMetaStore;
34use crate::controller::fragment::FragmentTypeMaskExt;
35
36pub type CdcTableBackfillTrackerRef = Arc<CdcTableBackfillTracker>;
37
38pub struct CdcTableBackfillTracker {
39 meta_store: SqlMetaStore,
40 inner: Mutex<CdcTableBackfillTrackerInner>,
41}
42
43#[derive(Clone)]
44pub struct CdcProgress {
45 pub split_total_count: u64,
47 pub split_backfilled_count: u64,
49 pub split_completed_count: u64,
51 split_assignment_generation: u64,
53 is_completed: bool,
54}
55
56impl CdcProgress {
57 fn restore(split_total_count: u64, generation: u64, is_completed: bool) -> Self {
58 Self {
59 split_total_count,
60 split_backfilled_count: 0,
61 split_completed_count: 0,
62 split_assignment_generation: generation,
63 is_completed,
64 }
65 }
66
67 fn new_partial(split_total_count: u64) -> Self {
69 Self {
70 split_total_count,
71 split_backfilled_count: 0,
72 split_completed_count: 0,
73 split_assignment_generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
74 is_completed: false,
75 }
76 }
77}
78
79impl CdcTableBackfillTracker {
80 pub async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
81 let inner = CdcTableBackfillTrackerInner::new(meta_store.clone()).await?;
82 let inst = Self {
83 meta_store,
84 inner: Mutex::new(inner),
85 };
86 Ok(inst)
87 }
88
89 pub fn apply_collected_command(
90 &self,
91 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
92 ) -> Vec<TableId> {
93 let mut inner = self.inner.lock();
94 let mut pre_completed_jobs = vec![];
95 for resp in resps {
96 for progress in &resp.cdc_table_backfill_progress {
97 pre_completed_jobs.extend(inner.update_split_progress(progress));
98 }
99 }
100 pre_completed_jobs.into_iter().map(Into::into).collect()
101 }
102
103 pub async fn complete_job(&self, job_id: TableId) -> MetaResult<()> {
104 let txn = self.meta_store.conn.begin().await?;
105 let bound = OwnedRow::new(vec![None]).value_serialize();
107 cdc_table_snapshot_split::Entity::update_many()
108 .col_expr(
109 cdc_table_snapshot_split::Column::IsBackfillFinished,
110 Expr::value(1),
111 )
112 .col_expr(
113 cdc_table_snapshot_split::Column::Left,
114 Expr::value(bound.clone()),
115 )
116 .col_expr(cdc_table_snapshot_split::Column::Right, Expr::value(bound))
117 .filter(
118 cdc_table_snapshot_split::Column::TableId
119 .eq(job_id.table_id as risingwave_meta_model::TableId),
120 )
121 .filter(cdc_table_snapshot_split::Column::SplitId.eq(CDC_TABLE_SPLIT_ID_START))
122 .exec(&txn)
123 .await?;
124 cdc_table_snapshot_split::Entity::delete_many()
126 .filter(
127 cdc_table_snapshot_split::Column::TableId
128 .eq(job_id.table_id as risingwave_meta_model::TableId),
129 )
130 .filter(cdc_table_snapshot_split::Column::SplitId.gt(CDC_TABLE_SPLIT_ID_START))
131 .exec(&txn)
132 .await?;
133 txn.commit().await?;
134 self.inner.lock().complete_job(job_id);
135 Ok(())
136 }
137
138 pub fn track_new_job(&self, job_id: u32, split_count: u64) {
141 self.inner.lock().track_new_job(job_id, split_count);
142 }
143
144 pub fn add_fragment_table_mapping(&self, fragment_ids: impl Iterator<Item = u32>, job_id: u32) {
145 self.inner
146 .lock()
147 .add_fragment_job_mapping(fragment_ids, job_id);
148 }
149
150 pub fn next_generation(&self, job_ids: impl Iterator<Item = u32>) -> u64 {
151 let mut inner = self.inner.lock();
152 let generation = inner.next_generation;
153 inner.next_generation += 1;
154 for job_id in job_ids {
155 inner.update_split_assignment_generation(job_id, generation);
156 }
157 generation
158 }
159
160 pub fn list_cdc_progress(&self) -> HashMap<u32, CdcProgress> {
161 self.inner.lock().list_cdc_progress()
162 }
163
164 pub fn completed_job_ids(&self) -> HashSet<u32> {
165 self.inner.lock().completed_job_ids()
166 }
167}
168
169struct CdcTableBackfillTrackerInner {
170 cdc_progress: HashMap<u32, CdcProgress>,
171 next_generation: u64,
172 fragment_id_to_job_id: HashMap<u32, u32>,
173}
174
175impl CdcTableBackfillTrackerInner {
176 async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
177 let init_generation = INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID;
181 let restored = restore_progress(&meta_store).await?;
182 let cdc_progress = restored
183 .into_iter()
184 .map(|(job_id, (split_total_count, is_completed))| {
185 (
186 job_id,
187 CdcProgress::restore(split_total_count, init_generation, is_completed),
188 )
189 })
190 .collect();
191 let fragment_id_to_job_id = load_cdc_fragment_table_mapping(&meta_store).await?;
192 let inst = Self {
193 cdc_progress,
194 next_generation: init_generation + 1,
195 fragment_id_to_job_id,
196 };
197 Ok(inst)
198 }
199
200 fn track_new_job(&mut self, job_id: u32, split_count: u64) {
201 self.cdc_progress
202 .insert(job_id, CdcProgress::new_partial(split_count));
203 }
204
205 fn update_split_progress(&mut self, progress: &PbCdcTableBackfillProgress) -> Vec<u32> {
206 let Some(job_id) = self.fragment_id_to_job_id.get(&progress.fragment_id) else {
207 tracing::warn!(
208 fragment_id = progress.fragment_id,
209 "CDC table mapping not found."
210 );
211 return vec![];
212 };
213 tracing::debug!(?progress, "Complete split.");
214 let Some(current_progress) = self.cdc_progress.get_mut(job_id) else {
215 tracing::warn!(job_id, "CDC table current progress not found.");
216 return vec![];
217 };
218 if current_progress.is_completed {
219 return vec![];
220 }
221 let mut pre_completed_jobs = vec![];
222 assert_ne!(
223 progress.generation,
224 INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID
225 );
226 if current_progress.split_assignment_generation == progress.generation {
227 if progress.done {
228 current_progress.split_completed_count += (1 + progress.split_id_end_inclusive
229 - progress.split_id_start_inclusive)
230 as u64;
231 if current_progress.split_completed_count == current_progress.split_total_count {
232 pre_completed_jobs.push(*job_id);
233 }
234 }
235 current_progress.split_backfilled_count +=
236 (1 + progress.split_id_end_inclusive - progress.split_id_start_inclusive) as u64;
237 }
238 pre_completed_jobs
239 }
240
241 fn complete_job(&mut self, job_id: TableId) {
242 if let Some(p) = self.cdc_progress.get_mut(&job_id.table_id) {
243 p.is_completed = true;
244 } else {
245 tracing::warn!(?job_id, "CDC table current progress not found.");
246 }
247 }
248
249 fn update_split_assignment_generation(&mut self, job_id: u32, generation: u64) {
250 if let Some(p) = self.cdc_progress.get_mut(&job_id) {
251 p.split_assignment_generation = generation;
252 p.split_backfilled_count = 0;
253 p.split_completed_count = 0;
254 } else {
255 tracing::warn!(job_id, generation, "CDC table current progress not found.");
256 }
257 }
258
259 fn add_fragment_job_mapping(&mut self, fragment_ids: impl Iterator<Item = u32>, job_id: u32) {
260 for fragment_id in fragment_ids {
261 self.fragment_id_to_job_id.insert(fragment_id, job_id);
262 }
263 }
264
265 fn list_cdc_progress(&self) -> HashMap<u32, CdcProgress> {
266 self.cdc_progress
267 .iter()
268 .map(|(job_id, progress)| {
269 if progress.is_completed {
270 (
273 *job_id,
274 CdcProgress {
275 split_total_count: progress.split_total_count,
276 split_backfilled_count: progress.split_total_count,
277 split_completed_count: progress.split_total_count,
278 split_assignment_generation: progress.split_assignment_generation,
279 is_completed: true,
280 },
281 )
282 } else {
283 (*job_id, progress.clone())
284 }
285 })
286 .collect()
287 }
288
289 fn completed_job_ids(&self) -> HashSet<u32> {
290 self.cdc_progress
291 .iter()
292 .filter_map(
293 |(job_id, p)| {
294 if p.is_completed { Some(*job_id) } else { None }
295 },
296 )
297 .collect()
298 }
299}
300
301async fn restore_progress(meta_store: &SqlMetaStore) -> MetaResult<HashMap<u32, (u64, bool)>> {
302 let split_progress: Vec<(i32, i64, i16)> = cdc_table_snapshot_split::Entity::find()
303 .select_only()
304 .column(cdc_table_snapshot_split::Column::TableId)
305 .column_as(
306 cdc_table_snapshot_split::Column::TableId.count(),
307 "split_total_count",
308 )
309 .column_as(
310 cdc_table_snapshot_split::Column::IsBackfillFinished.max(),
313 "split_completed_count",
314 )
315 .group_by(cdc_table_snapshot_split::Column::TableId)
316 .into_tuple()
317 .all(&meta_store.conn)
318 .await?;
319 split_progress
320 .into_iter()
321 .map(|(job_id, split_total_count, split_completed_count)| {
322 assert!(
323 split_completed_count <= 1,
324 "split_completed_count = {}",
325 split_completed_count
326 );
327 let is_backfill_finished = split_completed_count == 1;
328 if is_backfill_finished && split_total_count != 1 {
329 tracing::error!(
332 job_id,
333 split_total_count,
334 split_completed_count,
335 "unexpected split count"
336 );
337 Err(anyhow!(format!("unexpected split count:job_id={job_id}, split_total_count={split_total_count}, split_completed_count={split_completed_count}")).into())
338 } else {
339 Ok((
340 u32::try_from(job_id).unwrap(),
341 (
342 u64::try_from(split_total_count).unwrap(),
343 is_backfill_finished,
344 ),
345 ))
346 }
347 })
348 .try_collect()
349}
350
351async fn load_cdc_fragment_table_mapping(
352 meta_store: &SqlMetaStore,
353) -> MetaResult<HashMap<u32, u32>> {
354 use risingwave_common::catalog::FragmentTypeMask;
355 use risingwave_meta_model::prelude::Fragment as FragmentModel;
356 let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
357 .select_only()
358 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
359 .filter(FragmentTypeMask::intersects(
360 FragmentTypeFlag::StreamCdcScan,
361 ))
362 .into_tuple()
363 .all(&meta_store.conn)
364 .await?;
365 Ok(fragment_jobs
366 .into_iter()
367 .map(|(k, v)| (u32::try_from(k).unwrap(), u32::try_from(v).unwrap()))
368 .collect())
369}
370
371#[cfg(test)]
372mod test {
373 use std::iter;
374
375 use risingwave_pb::stream_service::BarrierCompleteResponse;
376 use risingwave_pb::stream_service::barrier_complete_response::CdcTableBackfillProgress;
377
378 use crate::barrier::cdc_progress::CdcTableBackfillTracker;
379 use crate::manager::MetaSrvEnv;
380
381 #[tokio::test]
382 async fn test_generation() {
383 let env = MetaSrvEnv::for_test().await;
384 let meta_store = env.meta_store();
385 let tracker = CdcTableBackfillTracker::new(meta_store).await.unwrap();
386 assert_eq!(tracker.inner.lock().next_generation, 2);
387 let table_id = 123;
388 let split_count = 10;
389 tracker.track_new_job(table_id, split_count);
390 tracker.add_fragment_table_mapping(vec![11, 12, 13].into_iter(), table_id);
391 let generation = tracker.next_generation(vec![table_id].into_iter());
392 assert_eq!(generation, 2);
393 assert_init_state(&tracker, table_id, generation, split_count);
394 let barrier_complete = BarrierCompleteResponse {
395 cdc_table_backfill_progress: vec![
396 CdcTableBackfillProgress {
397 done: true,
398 split_id_start_inclusive: 1,
399 split_id_end_inclusive: 2,
400 generation,
401 fragment_id: 12,
402 ..Default::default()
403 },
404 CdcTableBackfillProgress {
405 done: true,
406 split_id_start_inclusive: 5,
407 split_id_end_inclusive: 10,
408 generation,
409 fragment_id: 11,
410 ..Default::default()
411 },
412 ],
413 ..Default::default()
414 };
415 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
416 assert!(completed.is_empty());
417 assert_eq!(
418 tracker
419 .inner
420 .lock()
421 .cdc_progress
422 .get(&table_id)
423 .unwrap()
424 .split_completed_count,
425 8
426 );
427
428 let generation = tracker.next_generation(vec![table_id].into_iter());
430 assert_eq!(generation, 3);
431 assert_init_state(&tracker, table_id, generation, split_count);
432 let barrier_complete = BarrierCompleteResponse {
433 cdc_table_backfill_progress: vec![CdcTableBackfillProgress {
434 done: true,
435 split_id_start_inclusive: 3,
436 split_id_end_inclusive: 4,
437 generation: generation - 1,
439 fragment_id: 13,
440 ..Default::default()
441 }],
442 ..Default::default()
443 };
444 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
445 assert!(completed.is_empty());
446 assert_init_state(&tracker, table_id, generation, split_count);
447 assert_eq!(
448 tracker
449 .inner
450 .lock()
451 .cdc_progress
452 .get(&table_id)
453 .unwrap()
454 .split_completed_count,
455 0
456 );
457
458 let barrier_complete = BarrierCompleteResponse {
459 cdc_table_backfill_progress: vec![
460 CdcTableBackfillProgress {
461 done: true,
462 split_id_start_inclusive: 1,
463 split_id_end_inclusive: 2,
464 generation,
465 fragment_id: 12,
466 ..Default::default()
467 },
468 CdcTableBackfillProgress {
469 done: true,
470 split_id_start_inclusive: 5,
471 split_id_end_inclusive: 10,
472 generation,
473 fragment_id: 11,
474 ..Default::default()
475 },
476 CdcTableBackfillProgress {
477 done: true,
478 split_id_start_inclusive: 3,
479 split_id_end_inclusive: 4,
480 generation,
481 fragment_id: 13,
482 ..Default::default()
483 },
484 ],
485 ..Default::default()
486 };
487 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
488 assert_eq!(completed, vec![table_id.into()]);
489 assert_eq!(
490 tracker
491 .inner
492 .lock()
493 .cdc_progress
494 .get(&table_id)
495 .unwrap()
496 .split_completed_count,
497 10
498 );
499 }
500
501 fn assert_init_state(
502 tracker: &CdcTableBackfillTracker,
503 table_id: u32,
504 generation: u64,
505 split_count: u64,
506 ) {
507 assert_eq!(
508 tracker
509 .inner
510 .lock()
511 .cdc_progress
512 .get(&table_id)
513 .unwrap()
514 .split_assignment_generation,
515 generation
516 );
517 assert_eq!(
518 tracker
519 .inner
520 .lock()
521 .cdc_progress
522 .get(&table_id)
523 .unwrap()
524 .split_total_count,
525 split_count
526 );
527 assert_eq!(
528 tracker
529 .inner
530 .lock()
531 .cdc_progress
532 .get(&table_id)
533 .cloned()
534 .unwrap()
535 .split_completed_count,
536 0
537 );
538 }
539}