risingwave_meta/barrier/
cdc_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17
18use anyhow::anyhow;
19use parking_lot::Mutex;
20use risingwave_common::catalog::{FragmentTypeFlag, TableId};
21use risingwave_common::row::{OwnedRow, Row};
22use risingwave_connector::source::cdc::external::CDC_TABLE_SPLIT_ID_START;
23use risingwave_connector::source::cdc::{
24    INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID, INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
25};
26use risingwave_meta_model::{FragmentId, ObjectId, cdc_table_snapshot_split, fragment};
27use risingwave_pb::stream_service::PbBarrierCompleteResponse;
28use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
29use sea_orm::prelude::Expr;
30use sea_orm::{ColumnTrait, EntityTrait, QueryFilter, QuerySelect, TransactionTrait};
31
32use crate::MetaResult;
33use crate::controller::SqlMetaStore;
34use crate::controller::fragment::FragmentTypeMaskExt;
35
36pub type CdcTableBackfillTrackerRef = Arc<CdcTableBackfillTracker>;
37
38pub struct CdcTableBackfillTracker {
39    meta_store: SqlMetaStore,
40    inner: Mutex<CdcTableBackfillTrackerInner>,
41}
42
43#[derive(Clone)]
44pub struct CdcProgress {
45    /// The total number of splits, immutable.
46    pub split_total_count: u64,
47    /// The number of splits that has completed backfill.
48    pub split_backfilled_count: u64,
49    /// The number of splits that has completed backfill and synchronized CDC offset.
50    pub split_completed_count: u64,
51    /// The generation of split assignment.
52    split_assignment_generation: u64,
53    is_completed: bool,
54}
55
56impl CdcProgress {
57    fn restore(split_total_count: u64, generation: u64, is_completed: bool) -> Self {
58        Self {
59            split_total_count,
60            split_backfilled_count: 0,
61            split_completed_count: 0,
62            split_assignment_generation: generation,
63            is_completed,
64        }
65    }
66
67    /// A generation ID must be assigned before it can be used.
68    fn new_partial(split_total_count: u64) -> Self {
69        Self {
70            split_total_count,
71            split_backfilled_count: 0,
72            split_completed_count: 0,
73            split_assignment_generation: INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
74            is_completed: false,
75        }
76    }
77}
78
79impl CdcTableBackfillTracker {
80    pub async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
81        let inner = CdcTableBackfillTrackerInner::new(meta_store.clone()).await?;
82        let inst = Self {
83            meta_store,
84            inner: Mutex::new(inner),
85        };
86        Ok(inst)
87    }
88
89    pub fn apply_collected_command(
90        &self,
91        resps: impl IntoIterator<Item = &PbBarrierCompleteResponse>,
92    ) -> Vec<TableId> {
93        let mut inner = self.inner.lock();
94        let mut pre_completed_jobs = vec![];
95        for resp in resps {
96            for progress in &resp.cdc_table_backfill_progress {
97                pre_completed_jobs.extend(inner.update_split_progress(progress));
98            }
99        }
100        pre_completed_jobs.into_iter().map(Into::into).collect()
101    }
102
103    pub async fn complete_job(&self, job_id: TableId) -> MetaResult<()> {
104        let txn = self.meta_store.conn.begin().await?;
105        // Rewrite the first split as [inf, inf].
106        let bound = OwnedRow::new(vec![None]).value_serialize();
107        cdc_table_snapshot_split::Entity::update_many()
108            .col_expr(
109                cdc_table_snapshot_split::Column::IsBackfillFinished,
110                Expr::value(1),
111            )
112            .col_expr(
113                cdc_table_snapshot_split::Column::Left,
114                Expr::value(bound.clone()),
115            )
116            .col_expr(cdc_table_snapshot_split::Column::Right, Expr::value(bound))
117            .filter(
118                cdc_table_snapshot_split::Column::TableId
119                    .eq(job_id.table_id as risingwave_meta_model::TableId),
120            )
121            .filter(cdc_table_snapshot_split::Column::SplitId.eq(CDC_TABLE_SPLIT_ID_START))
122            .exec(&txn)
123            .await?;
124        // Keep only the first split.
125        cdc_table_snapshot_split::Entity::delete_many()
126            .filter(
127                cdc_table_snapshot_split::Column::TableId
128                    .eq(job_id.table_id as risingwave_meta_model::TableId),
129            )
130            .filter(cdc_table_snapshot_split::Column::SplitId.gt(CDC_TABLE_SPLIT_ID_START))
131            .exec(&txn)
132            .await?;
133        txn.commit().await?;
134        self.inner.lock().complete_job(job_id);
135        Ok(())
136    }
137
138    /// Starts to track the progress for the `job_id`.
139    /// A generation ID must be assigned via `next_generation` before it can be used.
140    pub fn track_new_job(&self, job_id: u32, split_count: u64) {
141        self.inner.lock().track_new_job(job_id, split_count);
142    }
143
144    pub fn add_fragment_table_mapping(&self, fragment_ids: impl Iterator<Item = u32>, job_id: u32) {
145        self.inner
146            .lock()
147            .add_fragment_job_mapping(fragment_ids, job_id);
148    }
149
150    pub fn next_generation(&self, job_ids: impl Iterator<Item = u32>) -> u64 {
151        let mut inner = self.inner.lock();
152        let generation = inner.next_generation;
153        inner.next_generation += 1;
154        for job_id in job_ids {
155            inner.update_split_assignment_generation(job_id, generation);
156        }
157        generation
158    }
159
160    pub fn list_cdc_progress(&self) -> HashMap<u32, CdcProgress> {
161        self.inner.lock().list_cdc_progress()
162    }
163
164    pub fn completed_job_ids(&self) -> HashSet<u32> {
165        self.inner.lock().completed_job_ids()
166    }
167}
168
169struct CdcTableBackfillTrackerInner {
170    cdc_progress: HashMap<u32, CdcProgress>,
171    next_generation: u64,
172    fragment_id_to_job_id: HashMap<u32, u32>,
173}
174
175impl CdcTableBackfillTrackerInner {
176    async fn new(meta_store: SqlMetaStore) -> MetaResult<Self> {
177        // The generation only resets after meta node is restarted.
178        // The barrier carrying expired generation will be rejected by the restarted meta node.
179        // Thus the invalid progress won't be applied to the tracker.
180        let init_generation = INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID;
181        let restored = restore_progress(&meta_store).await?;
182        let cdc_progress = restored
183            .into_iter()
184            .map(|(job_id, (split_total_count, is_completed))| {
185                (
186                    job_id,
187                    CdcProgress::restore(split_total_count, init_generation, is_completed),
188                )
189            })
190            .collect();
191        let fragment_id_to_job_id = load_cdc_fragment_table_mapping(&meta_store).await?;
192        let inst = Self {
193            cdc_progress,
194            next_generation: init_generation + 1,
195            fragment_id_to_job_id,
196        };
197        Ok(inst)
198    }
199
200    fn track_new_job(&mut self, job_id: u32, split_count: u64) {
201        self.cdc_progress
202            .insert(job_id, CdcProgress::new_partial(split_count));
203    }
204
205    fn update_split_progress(&mut self, progress: &PbCdcTableBackfillProgress) -> Vec<u32> {
206        let Some(job_id) = self.fragment_id_to_job_id.get(&progress.fragment_id) else {
207            tracing::warn!(
208                fragment_id = progress.fragment_id,
209                "CDC table mapping not found."
210            );
211            return vec![];
212        };
213        tracing::debug!(?progress, "Complete split.");
214        let Some(current_progress) = self.cdc_progress.get_mut(job_id) else {
215            tracing::warn!(job_id, "CDC table current progress not found.");
216            return vec![];
217        };
218        if current_progress.is_completed {
219            return vec![];
220        }
221        let mut pre_completed_jobs = vec![];
222        assert_ne!(
223            progress.generation,
224            INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID
225        );
226        if current_progress.split_assignment_generation == progress.generation {
227            if progress.done {
228                current_progress.split_completed_count += (1 + progress.split_id_end_inclusive
229                    - progress.split_id_start_inclusive)
230                    as u64;
231                if current_progress.split_completed_count == current_progress.split_total_count {
232                    pre_completed_jobs.push(*job_id);
233                }
234            }
235            current_progress.split_backfilled_count +=
236                (1 + progress.split_id_end_inclusive - progress.split_id_start_inclusive) as u64;
237        }
238        pre_completed_jobs
239    }
240
241    fn complete_job(&mut self, job_id: TableId) {
242        if let Some(p) = self.cdc_progress.get_mut(&job_id.table_id) {
243            p.is_completed = true;
244        } else {
245            tracing::warn!(?job_id, "CDC table current progress not found.");
246        }
247    }
248
249    fn update_split_assignment_generation(&mut self, job_id: u32, generation: u64) {
250        if let Some(p) = self.cdc_progress.get_mut(&job_id) {
251            p.split_assignment_generation = generation;
252            p.split_backfilled_count = 0;
253            p.split_completed_count = 0;
254        } else {
255            tracing::warn!(job_id, generation, "CDC table current progress not found.");
256        }
257    }
258
259    fn add_fragment_job_mapping(&mut self, fragment_ids: impl Iterator<Item = u32>, job_id: u32) {
260        for fragment_id in fragment_ids {
261            self.fragment_id_to_job_id.insert(fragment_id, job_id);
262        }
263    }
264
265    fn list_cdc_progress(&self) -> HashMap<u32, CdcProgress> {
266        self.cdc_progress
267            .iter()
268            .map(|(job_id, progress)| {
269                if progress.is_completed {
270                    // The progress of a completed job won't be set by try_complete_split correctly.
271                    // Instead, assign it an arbitrary value.
272                    (
273                        *job_id,
274                        CdcProgress {
275                            split_total_count: progress.split_total_count,
276                            split_backfilled_count: progress.split_total_count,
277                            split_completed_count: progress.split_total_count,
278                            split_assignment_generation: progress.split_assignment_generation,
279                            is_completed: true,
280                        },
281                    )
282                } else {
283                    (*job_id, progress.clone())
284                }
285            })
286            .collect()
287    }
288
289    fn completed_job_ids(&self) -> HashSet<u32> {
290        self.cdc_progress
291            .iter()
292            .filter_map(
293                |(job_id, p)| {
294                    if p.is_completed { Some(*job_id) } else { None }
295                },
296            )
297            .collect()
298    }
299}
300
301async fn restore_progress(meta_store: &SqlMetaStore) -> MetaResult<HashMap<u32, (u64, bool)>> {
302    let split_progress: Vec<(i32, i64, i16)> = cdc_table_snapshot_split::Entity::find()
303        .select_only()
304        .column(cdc_table_snapshot_split::Column::TableId)
305        .column_as(
306            cdc_table_snapshot_split::Column::TableId.count(),
307            "split_total_count",
308        )
309        .column_as(
310            // The sum of PG and MySQL behaves differently in terms of returning type and casting.
311            // Should use sum here but currently use max instead to work around compatibility issue.
312            cdc_table_snapshot_split::Column::IsBackfillFinished.max(),
313            "split_completed_count",
314        )
315        .group_by(cdc_table_snapshot_split::Column::TableId)
316        .into_tuple()
317        .all(&meta_store.conn)
318        .await?;
319    split_progress
320        .into_iter()
321        .map(|(job_id, split_total_count, split_completed_count)| {
322            assert!(
323                split_completed_count <= 1,
324                "split_completed_count = {}",
325                split_completed_count
326            );
327            let is_backfill_finished = split_completed_count == 1;
328            if is_backfill_finished && split_total_count != 1 {
329                // CdcTableBackfillTracker::complete_job rewrites splits in a transaction.
330                // This error should only happen when the meta store reads uncommitted data.
331                tracing::error!(
332                    job_id,
333                    split_total_count,
334                    split_completed_count,
335                    "unexpected split count"
336                );
337                Err(anyhow!(format!("unexpected split count:job_id={job_id}, split_total_count={split_total_count}, split_completed_count={split_completed_count}")).into())
338            } else {
339                Ok((
340                    u32::try_from(job_id).unwrap(),
341                    (
342                        u64::try_from(split_total_count).unwrap(),
343                        is_backfill_finished,
344                    ),
345                ))
346            }
347        })
348        .try_collect()
349}
350
351async fn load_cdc_fragment_table_mapping(
352    meta_store: &SqlMetaStore,
353) -> MetaResult<HashMap<u32, u32>> {
354    use risingwave_common::catalog::FragmentTypeMask;
355    use risingwave_meta_model::prelude::Fragment as FragmentModel;
356    let fragment_jobs: Vec<(FragmentId, ObjectId)> = FragmentModel::find()
357        .select_only()
358        .columns([fragment::Column::FragmentId, fragment::Column::JobId])
359        .filter(FragmentTypeMask::intersects(
360            FragmentTypeFlag::StreamCdcScan,
361        ))
362        .into_tuple()
363        .all(&meta_store.conn)
364        .await?;
365    Ok(fragment_jobs
366        .into_iter()
367        .map(|(k, v)| (u32::try_from(k).unwrap(), u32::try_from(v).unwrap()))
368        .collect())
369}
370
371#[cfg(test)]
372mod test {
373    use std::iter;
374
375    use risingwave_pb::stream_service::BarrierCompleteResponse;
376    use risingwave_pb::stream_service::barrier_complete_response::CdcTableBackfillProgress;
377
378    use crate::barrier::cdc_progress::CdcTableBackfillTracker;
379    use crate::manager::MetaSrvEnv;
380
381    #[tokio::test]
382    async fn test_generation() {
383        let env = MetaSrvEnv::for_test().await;
384        let meta_store = env.meta_store();
385        let tracker = CdcTableBackfillTracker::new(meta_store).await.unwrap();
386        assert_eq!(tracker.inner.lock().next_generation, 2);
387        let table_id = 123;
388        let split_count = 10;
389        tracker.track_new_job(table_id, split_count);
390        tracker.add_fragment_table_mapping(vec![11, 12, 13].into_iter(), table_id);
391        let generation = tracker.next_generation(vec![table_id].into_iter());
392        assert_eq!(generation, 2);
393        assert_init_state(&tracker, table_id, generation, split_count);
394        let barrier_complete = BarrierCompleteResponse {
395            cdc_table_backfill_progress: vec![
396                CdcTableBackfillProgress {
397                    done: true,
398                    split_id_start_inclusive: 1,
399                    split_id_end_inclusive: 2,
400                    generation,
401                    fragment_id: 12,
402                    ..Default::default()
403                },
404                CdcTableBackfillProgress {
405                    done: true,
406                    split_id_start_inclusive: 5,
407                    split_id_end_inclusive: 10,
408                    generation,
409                    fragment_id: 11,
410                    ..Default::default()
411                },
412            ],
413            ..Default::default()
414        };
415        let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
416        assert!(completed.is_empty());
417        assert_eq!(
418            tracker
419                .inner
420                .lock()
421                .cdc_progress
422                .get(&table_id)
423                .unwrap()
424                .split_completed_count,
425            8
426        );
427
428        // Reset generation.
429        let generation = tracker.next_generation(vec![table_id].into_iter());
430        assert_eq!(generation, 3);
431        assert_init_state(&tracker, table_id, generation, split_count);
432        let barrier_complete = BarrierCompleteResponse {
433            cdc_table_backfill_progress: vec![CdcTableBackfillProgress {
434                done: true,
435                split_id_start_inclusive: 3,
436                split_id_end_inclusive: 4,
437                // Expired generation.
438                generation: generation - 1,
439                fragment_id: 13,
440                ..Default::default()
441            }],
442            ..Default::default()
443        };
444        let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
445        assert!(completed.is_empty());
446        assert_init_state(&tracker, table_id, generation, split_count);
447        assert_eq!(
448            tracker
449                .inner
450                .lock()
451                .cdc_progress
452                .get(&table_id)
453                .unwrap()
454                .split_completed_count,
455            0
456        );
457
458        let barrier_complete = BarrierCompleteResponse {
459            cdc_table_backfill_progress: vec![
460                CdcTableBackfillProgress {
461                    done: true,
462                    split_id_start_inclusive: 1,
463                    split_id_end_inclusive: 2,
464                    generation,
465                    fragment_id: 12,
466                    ..Default::default()
467                },
468                CdcTableBackfillProgress {
469                    done: true,
470                    split_id_start_inclusive: 5,
471                    split_id_end_inclusive: 10,
472                    generation,
473                    fragment_id: 11,
474                    ..Default::default()
475                },
476                CdcTableBackfillProgress {
477                    done: true,
478                    split_id_start_inclusive: 3,
479                    split_id_end_inclusive: 4,
480                    generation,
481                    fragment_id: 13,
482                    ..Default::default()
483                },
484            ],
485            ..Default::default()
486        };
487        let completed = tracker.apply_collected_command(iter::once(&barrier_complete));
488        assert_eq!(completed, vec![table_id.into()]);
489        assert_eq!(
490            tracker
491                .inner
492                .lock()
493                .cdc_progress
494                .get(&table_id)
495                .unwrap()
496                .split_completed_count,
497            10
498        );
499    }
500
501    fn assert_init_state(
502        tracker: &CdcTableBackfillTracker,
503        table_id: u32,
504        generation: u64,
505        split_count: u64,
506    ) {
507        assert_eq!(
508            tracker
509                .inner
510                .lock()
511                .cdc_progress
512                .get(&table_id)
513                .unwrap()
514                .split_assignment_generation,
515            generation
516        );
517        assert_eq!(
518            tracker
519                .inner
520                .lock()
521                .cdc_progress
522                .get(&table_id)
523                .unwrap()
524                .split_total_count,
525            split_count
526        );
527        assert_eq!(
528            tracker
529                .inner
530                .lock()
531                .cdc_progress
532                .get(&table_id)
533                .cloned()
534                .unwrap()
535                .split_completed_count,
536            0
537        );
538    }
539}