1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::Mutex;
20use risingwave_common::catalog::FragmentTypeFlag;
21use risingwave_common::id::JobId;
22use risingwave_common::row::{OwnedRow, Row};
23use risingwave_connector::source::cdc::external::CDC_TABLE_SPLIT_ID_START;
24use risingwave_connector::source::cdc::{
25 INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID, INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
26};
27use risingwave_meta_model::{FragmentId, cdc_table_snapshot_split, fragment};
28use risingwave_pb::stream_service::PbBarrierCompleteResponse;
29use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
30use sea_orm::prelude::Expr;
31use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, TransactionTrait};
32
33use crate::MetaResult;
34use crate::controller::SqlMetaStore;
35use crate::controller::fragment::FragmentTypeMaskExt;
36
37pub type CdcTableBackfillTrackerRef = Arc<CdcTableBackfillTracker>;
38
39pub struct CdcTableBackfillTracker {
40 meta_store: SqlMetaStore,
41 inner: Mutex<CdcTableBackfillTrackerInner>,
42}
43
44#[derive(Clone)]
45pub struct CdcProgress {
46 pub split_total_count: u64,
48 pub split_backfilled_count: u64,
50 pub split_completed_count: u64,
52 split_assignment_generation: u64,
54 is_completed: bool,
55}
56
57impl CdcProgress {
58 fn restore(split_total_count: u64, generation: u64, is_completed: bool) -> Self {
59 Self {
60 split_total_count,
61 split_backfilled_count: 0,
62 split_completed_count: 0,
63 split_assignment_generation: generation,
64 is_completed,
65 }
66 }
67
68 fn new_partial(split_total_count: u64) -> Self {
70 Self {
71 split_total_count,
72 split_backfilled_count: 0,
73 split_completed_count: 0,
74 split_assignment_generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
75 is_completed: false,
76 }
77 }
78}
79
80impl CdcTableBackfillTracker {
81 pub async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
82 let inner = CdcTableBackfillTrackerInner::new(meta_store.clone()).await?;
83 let inst = Self {
84 meta_store,
85 inner: Mutex::new(inner),
86 };
87 Ok(inst)
88 }
89
90 pub fn apply_collected_command(
91 &self,
92 resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
93 ) -> Vec<JobId> {
94 let mut inner = self.inner.lock();
95 let mut pre_completed_jobs = vec![];
96 for resp in resps {
97 for progress in &resp.cdc_table_backfill_progress {
98 pre_completed_jobs.extend(inner.update_split_progress(progress));
99 }
100 }
101 pre_completed_jobs
102 }
103
104 pub async fn complete_job(&self, job_id: JobId) -> MetaResult<()> {
105 let txn = self.meta_store.conn.begin().await?;
106 let bound = OwnedRow::new(vec![None]).value_serialize();
108 cdc_table_snapshot_split::Entity::update_many()
109 .col_expr(
110 cdc_table_snapshot_split::Column::IsBackfillFinished,
111 Expr::value(1),
112 )
113 .col_expr(
114 cdc_table_snapshot_split::Column::Left,
115 Expr::value(bound.clone()),
116 )
117 .col_expr(cdc_table_snapshot_split::Column::Right, Expr::value(bound))
118 .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
119 .filter(cdc_table_snapshot_split::Column::SplitId.eq(CDC_TABLE_SPLIT_ID_START))
120 .exec(&txn)
121 .await?;
122 cdc_table_snapshot_split::Entity::delete_many()
124 .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
125 .filter(cdc_table_snapshot_split::Column::SplitId.gt(CDC_TABLE_SPLIT_ID_START))
126 .exec(&txn)
127 .await?;
128 txn.commit().await?;
129 self.inner.lock().complete_job(job_id);
130 Ok(())
131 }
132
133 pub fn track_new_job(&self, job_id: JobId, split_count: u64) {
136 self.inner.lock().track_new_job(job_id, split_count);
137 }
138
139 pub fn add_fragment_table_mapping(
140 &self,
141 fragment_ids: impl Iterator<Item = FragmentId>,
142 job_id: JobId,
143 ) {
144 self.inner
145 .lock()
146 .add_fragment_job_mapping(fragment_ids, job_id);
147 }
148
149 pub fn next_generation(&self, job_ids: impl Iterator<Item = JobId>) -> u64 {
150 let mut inner = self.inner.lock();
151 let generation = inner.next_generation;
152 inner.next_generation += 1;
153 for job_id in job_ids {
154 inner.update_split_assignment_generation(job_id, generation);
155 }
156 generation
157 }
158
159 pub fn list_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
160 self.inner.lock().list_cdc_progress()
161 }
162
163 pub fn completed_job_ids(&self) -> HashSet<JobId> {
164 self.inner.lock().completed_job_ids()
165 }
166}
167
168struct CdcTableBackfillTrackerInner {
169 cdc_progress: HashMap<JobId, CdcProgress>,
170 next_generation: u64,
171 fragment_id_to_job_id: HashMap<FragmentId, JobId>,
172}
173
174impl CdcTableBackfillTrackerInner {
175 async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
176 let init_generation = INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID;
180 let restored = restore_progress(&meta_store).await?;
181 let cdc_progress = restored
182 .into_iter()
183 .map(|(job_id, (split_total_count, is_completed))| {
184 (
185 job_id,
186 CdcProgress::restore(split_total_count, init_generation, is_completed),
187 )
188 })
189 .collect();
190 let fragment_id_to_job_id = load_cdc_fragment_table_mapping(&meta_store).await?;
191 let inst = Self {
192 cdc_progress,
193 next_generation: init_generation + 1,
194 fragment_id_to_job_id,
195 };
196 Ok(inst)
197 }
198
199 fn track_new_job(&mut self, job_id: JobId, split_count: u64) {
200 self.cdc_progress
201 .insert(job_id, CdcProgress::new_partial(split_count));
202 }
203
204 fn update_split_progress(&mut self, progress: &PbCdcTableBackfillProgress) -> Vec<JobId> {
205 let Some(job_id) = self.fragment_id_to_job_id.get(&progress.fragment_id) else {
206 tracing::warn!(
207 fragment_id = %progress.fragment_id,
208 "CDC table mapping not found."
209 );
210 return vec![];
211 };
212 tracing::debug!(?progress, "Complete split.");
213 let Some(current_progress) = self.cdc_progress.get_mut(job_id) else {
214 tracing::warn!(%job_id, "CDC table current progress not found.");
215 return vec![];
216 };
217 if current_progress.is_completed {
218 return vec![];
219 }
220 let mut pre_completed_jobs = vec![];
221 assert_ne!(
222 progress.generation,
223 INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID
224 );
225 if current_progress.split_assignment_generation == progress.generation {
226 if progress.done {
227 current_progress.split_completed_count += (1 + progress.split_id_end_inclusive
228 - progress.split_id_start_inclusive)
229 as u64;
230 if current_progress.split_completed_count == current_progress.split_total_count {
231 pre_completed_jobs.push(*job_id);
232 }
233 }
234 current_progress.split_backfilled_count +=
235 (1 + progress.split_id_end_inclusive - progress.split_id_start_inclusive) as u64;
236 }
237 pre_completed_jobs
238 }
239
240 fn complete_job(&mut self, job_id: JobId) {
241 if let Some(p) = self.cdc_progress.get_mut(&job_id) {
242 p.is_completed = true;
243 } else {
244 tracing::warn!(%job_id, "CDC table current progress not found.");
245 }
246 }
247
248 fn update_split_assignment_generation(&mut self, job_id: JobId, generation: u64) {
249 if let Some(p) = self.cdc_progress.get_mut(&job_id) {
250 p.split_assignment_generation = generation;
251 p.split_backfilled_count = 0;
252 p.split_completed_count = 0;
253 } else {
254 tracing::warn!(%job_id, generation, "CDC table current progress not found.");
255 }
256 }
257
258 fn add_fragment_job_mapping(
259 &mut self,
260 fragment_ids: impl Iterator<Item = FragmentId>,
261 job_id: JobId,
262 ) {
263 for fragment_id in fragment_ids {
264 self.fragment_id_to_job_id.insert(fragment_id, job_id);
265 }
266 }
267
268 fn list_cdc_progress(&self) -> HashMap<JobId, CdcProgress> {
269 self.cdc_progress
270 .iter()
271 .map(|(job_id, progress)| {
272 if progress.is_completed {
273 (
276 *job_id,
277 CdcProgress {
278 split_total_count: progress.split_total_count,
279 split_backfilled_count: progress.split_total_count,
280 split_completed_count: progress.split_total_count,
281 split_assignment_generation: progress.split_assignment_generation,
282 is_completed: true,
283 },
284 )
285 } else {
286 (*job_id, progress.clone())
287 }
288 })
289 .collect()
290 }
291
292 fn completed_job_ids(&self) -> HashSet<JobId> {
293 self.cdc_progress
294 .iter()
295 .filter_map(
296 |(job_id, p)| {
297 if p.is_completed { Some(*job_id) } else { None }
298 },
299 )
300 .collect()
301 }
302}
303
304async fn restore_progress(meta_store: &SqlMetaStore) -> MetaResult<HashMap<JobId, (u64, bool)>> {
305 let split_progress: Vec<(JobId, i64, i16)> = cdc_table_snapshot_split::Entity::find()
306 .select_only()
307 .column(cdc_table_snapshot_split::Column::TableId)
308 .column_as(
309 cdc_table_snapshot_split::Column::TableId.count(),
310 "split_total_count",
311 )
312 .column_as(
313 cdc_table_snapshot_split::Column::IsBackfillFinished.max(),
316 "split_completed_count",
317 )
318 .group_by(cdc_table_snapshot_split::Column::TableId)
319 .into_tuple()
320 .all(&meta_store.conn)
321 .await?;
322 split_progress
323 .into_iter()
324 .map(|(job_id, split_total_count, split_completed_count)| {
325 assert!(
326 split_completed_count <= 1,
327 "split_completed_count = {}",
328 split_completed_count
329 );
330 let is_backfill_finished = split_completed_count == 1;
331 if is_backfill_finished && split_total_count != 1 {
332 tracing::error!(
335 %job_id,
336 split_total_count,
337 split_completed_count,
338 "unexpected split count"
339 );
340 Err(anyhow!(format!("unexpected split count:job_id={job_id}, split_total_count={split_total_count}, split_completed_count={split_completed_count}")).into())
341 } else {
342 Ok((
343 job_id,
344 (
345 u64::try_from(split_total_count).unwrap(),
346 is_backfill_finished,
347 ),
348 ))
349 }
350 })
351 .try_collect()
352}
353
354async fn load_cdc_fragment_table_mapping(
355 meta_store: &SqlMetaStore,
356) -> MetaResult<HashMap<FragmentId, JobId>> {
357 use risingwave_common::catalog::FragmentTypeMask;
358 use risingwave_meta_model::prelude::Fragment as FragmentModel;
359 let fragment_jobs: Vec<(FragmentId, JobId)> = FragmentModel::find()
360 .select_only()
361 .columns([fragment::Column::FragmentId, fragment::Column::JobId])
362 .filter(FragmentTypeMask::intersects(
363 FragmentTypeFlag::StreamCdcScan,
364 ))
365 .into_tuple()
366 .all(&meta_store.conn)
367 .await?;
368 Ok(fragment_jobs.into_iter().collect())
369}
370
371#[cfg(test)]
372mod test {
373 use std::iter;
374
375 use risingwave_common::id::JobId;
376 use risingwave_pb::stream_service::BarrierCompleteResponse;
377 use risingwave_pb::stream_service::barrier_complete_response::CdcTableBackfillProgress;
378
379 use crate::barrier::cdc_progress::CdcTableBackfillTracker;
380 use crate::manager::MetaSrvEnv;
381
382 #[tokio::test]
383 async fn test_generation() {
384 let env = MetaSrvEnv::for_test().await;
385 let meta_store = env.meta_store();
386 let tracker = CdcTableBackfillTracker::new(meta_store).await.unwrap();
387 assert_eq!(tracker.inner.lock().next_generation, 2);
388 let table_id = 123.into();
389 let split_count = 10;
390 tracker.track_new_job(table_id, split_count);
391 tracker.add_fragment_table_mapping(
392 vec![11.into(), 12.into(), 13.into()].into_iter(),
393 table_id,
394 );
395 let generation = tracker.next_generation(vec![table_id].into_iter());
396 assert_eq!(generation, 2);
397 assert_init_state(&tracker, table_id, generation, split_count);
398 let barrier_complete = BarrierCompleteResponse {
399 cdc_table_backfill_progress: vec![
400 CdcTableBackfillProgress {
401 done: true,
402 split_id_start_inclusive: 1,
403 split_id_end_inclusive: 2,
404 generation,
405 fragment_id: 12.into(),
406 ..Default::default()
407 },
408 CdcTableBackfillProgress {
409 done: true,
410 split_id_start_inclusive: 5,
411 split_id_end_inclusive: 10,
412 generation,
413 fragment_id: 11.into(),
414 ..Default::default()
415 },
416 ],
417 ..Default::default()
418 };
419 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
420 assert!(completed.is_empty());
421 assert_eq!(
422 tracker
423 .inner
424 .lock()
425 .cdc_progress
426 .get(&table_id)
427 .unwrap()
428 .split_completed_count,
429 8
430 );
431
432 let generation = tracker.next_generation(vec![table_id].into_iter());
434 assert_eq!(generation, 3);
435 assert_init_state(&tracker, table_id, generation, split_count);
436 let barrier_complete = BarrierCompleteResponse {
437 cdc_table_backfill_progress: vec![CdcTableBackfillProgress {
438 done: true,
439 split_id_start_inclusive: 3,
440 split_id_end_inclusive: 4,
441 generation: generation - 1,
443 fragment_id: 13.into(),
444 ..Default::default()
445 }],
446 ..Default::default()
447 };
448 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
449 assert!(completed.is_empty());
450 assert_init_state(&tracker, table_id, generation, split_count);
451 assert_eq!(
452 tracker
453 .inner
454 .lock()
455 .cdc_progress
456 .get(&table_id)
457 .unwrap()
458 .split_completed_count,
459 0
460 );
461
462 let barrier_complete = BarrierCompleteResponse {
463 cdc_table_backfill_progress: vec![
464 CdcTableBackfillProgress {
465 done: true,
466 split_id_start_inclusive: 1,
467 split_id_end_inclusive: 2,
468 generation,
469 fragment_id: 12.into(),
470 ..Default::default()
471 },
472 CdcTableBackfillProgress {
473 done: true,
474 split_id_start_inclusive: 5,
475 split_id_end_inclusive: 10,
476 generation,
477 fragment_id: 11.into(),
478 ..Default::default()
479 },
480 CdcTableBackfillProgress {
481 done: true,
482 split_id_start_inclusive: 3,
483 split_id_end_inclusive: 4,
484 generation,
485 fragment_id: 13.into(),
486 ..Default::default()
487 },
488 ],
489 ..Default::default()
490 };
491 let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
492 assert_eq!(completed, vec![table_id]);
493 assert_eq!(
494 tracker
495 .inner
496 .lock()
497 .cdc_progress
498 .get(&table_id)
499 .unwrap()
500 .split_completed_count,
501 10
502 );
503 }
504
505 fn assert_init_state(
506 tracker: &CdcTableBackfillTracker,
507 table_id: JobId,
508 generation: u64,
509 split_count: u64,
510 ) {
511 assert_eq!(
512 tracker
513 .inner
514 .lock()
515 .cdc_progress
516 .get(&table_id)
517 .unwrap()
518 .split_assignment_generation,
519 generation
520 );
521 assert_eq!(
522 tracker
523 .inner
524 .lock()
525 .cdc_progress
526 .get(&table_id)
527 .unwrap()
528 .split_total_count,
529 split_count
530 );
531 assert_eq!(
532 tracker
533 .inner
534 .lock()
535 .cdc_progress
536 .get(&table_id)
537 .cloned()
538 .unwrap()
539 .split_completed_count,
540 0
541 );
542 }
543}