risingwave_meta/barrier/
cdc_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::LazyLock;
17
18use risingwave_common::id::JobId;
19use risingwave_common::row::{OwnedRow, Row};
20use risingwave_connector::source::CdcTableSnapshotSplitRaw;
21use risingwave_connector::source::cdc::external::CDC_TABLE_SPLIT_ID_START;
22use risingwave_connector::source::cdc::{
23    INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID, INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
24};
25use risingwave_meta_model::{FragmentId, cdc_table_snapshot_split};
26use risingwave_pb::id::ActorId;
27use risingwave_pb::source::PbCdcTableSnapshotSplits;
28use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
29use sea_orm::prelude::Expr;
30use sea_orm::{ColumnTrait, ConnectionTrait, EntityTrait, QueryFilter};
31
32use crate::MetaResult;
33use crate::stream::cdc::{
34    CdcTableSnapshotSplits, assign_cdc_table_snapshot_splits, single_merged_split,
35};
36
37#[derive(Debug)]
38enum CdcBackfillStatus {
39    Backfilling(CdcBackfillProgress),
40    PreCompleted,
41    Completed,
42}
43
44#[derive(Debug)]
45struct CdcBackfillProgress {
46    splits: Vec<CdcTableSnapshotSplitRaw>,
47    /// The number of splits that has completed backfill.
48    split_backfilled_count: u64,
49    /// The number of splits that has completed backfill and synchronized CDC offset.
50    split_completed_count: u64,
51    /// The generation of split assignment.
52    split_assignment_generation: u64,
53}
54
55#[derive(Debug)]
56pub struct CdcProgress {
57    /// The total number of splits, immutable.
58    pub split_total_count: u64,
59    /// The number of splits that has completed backfill.
60    pub split_backfilled_count: u64,
61    /// The number of splits that has completed backfill and synchronized CDC offset.
62    pub split_completed_count: u64,
63}
64
65impl CdcTableBackfillTracker {
66    pub async fn mark_complete_job(txn: &impl ConnectionTrait, job_id: JobId) -> MetaResult<()> {
67        // Rewrite the first split as [inf, inf].
68        let bound = OwnedRow::new(vec![None]).value_serialize();
69        cdc_table_snapshot_split::Entity::update_many()
70            .col_expr(
71                cdc_table_snapshot_split::Column::IsBackfillFinished,
72                Expr::value(1),
73            )
74            .col_expr(
75                cdc_table_snapshot_split::Column::Left,
76                Expr::value(bound.clone()),
77            )
78            .col_expr(cdc_table_snapshot_split::Column::Right, Expr::value(bound))
79            .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
80            .filter(cdc_table_snapshot_split::Column::SplitId.eq(CDC_TABLE_SPLIT_ID_START))
81            .exec(txn)
82            .await?;
83        // Keep only the first split.
84        cdc_table_snapshot_split::Entity::delete_many()
85            .filter(cdc_table_snapshot_split::Column::TableId.eq(job_id))
86            .filter(cdc_table_snapshot_split::Column::SplitId.gt(CDC_TABLE_SPLIT_ID_START))
87            .exec(txn)
88            .await?;
89        Ok(())
90    }
91}
92
93#[derive(Debug)]
94pub(super) struct CdcTableBackfillTracker {
95    status: CdcBackfillStatus,
96    cdc_scan_fragment_id: FragmentId,
97    next_generation: u64,
98}
99
100impl CdcTableBackfillTracker {
101    fn new_inner(cdc_scan_fragment_id: FragmentId, splits: CdcTableSnapshotSplits) -> Self {
102        let status = match splits {
103            CdcTableSnapshotSplits::Backfilling(splits) => {
104                CdcBackfillStatus::Backfilling(CdcBackfillProgress {
105                    splits,
106                    split_backfilled_count: 0,
107                    split_completed_count: 0,
108                    split_assignment_generation: INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID,
109                })
110            }
111            CdcTableSnapshotSplits::Finished => CdcBackfillStatus::Completed,
112        };
113        Self {
114            status,
115            cdc_scan_fragment_id,
116            next_generation: INITIAL_CDC_SPLIT_ASSIGNMENT_GENERATION_ID + 1,
117        }
118    }
119
120    pub fn restore(cdc_scan_fragment_id: FragmentId, splits: CdcTableSnapshotSplits) -> Self {
121        Self::new_inner(cdc_scan_fragment_id, splits)
122    }
123
124    pub fn new(cdc_scan_fragment_id: FragmentId, splits: Vec<CdcTableSnapshotSplitRaw>) -> Self {
125        Self::new_inner(
126            cdc_scan_fragment_id,
127            CdcTableSnapshotSplits::Backfilling(splits),
128        )
129    }
130
131    pub fn cdc_scan_fragment_id(&self) -> FragmentId {
132        self.cdc_scan_fragment_id
133    }
134
135    pub fn update_split_progress(&mut self, progress: &PbCdcTableBackfillProgress) {
136        tracing::debug!(?progress, "Complete split.");
137        let current_progress = match &mut self.status {
138            CdcBackfillStatus::Backfilling(progress) => progress,
139            CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => {
140                return;
141            }
142        };
143        assert_ne!(
144            progress.generation,
145            INVALID_CDC_SPLIT_ASSIGNMENT_GENERATION_ID
146        );
147        if current_progress.split_assignment_generation == progress.generation {
148            current_progress.split_backfilled_count +=
149                (1 + progress.split_id_end_inclusive - progress.split_id_start_inclusive) as u64;
150            if progress.done {
151                current_progress.split_completed_count += (1 + progress.split_id_end_inclusive
152                    - progress.split_id_start_inclusive)
153                    as u64;
154                if current_progress.split_completed_count == current_progress.splits.len() as u64 {
155                    self.status = CdcBackfillStatus::PreCompleted;
156                }
157            }
158        }
159    }
160
161    pub fn reassign_splits(
162        &mut self,
163        actor_ids: HashSet<ActorId>,
164    ) -> MetaResult<HashMap<ActorId, PbCdcTableSnapshotSplits>> {
165        let generation = self.next_generation;
166        self.next_generation += 1;
167        let splits = match &mut self.status {
168            CdcBackfillStatus::Backfilling(progress) => {
169                progress.split_backfilled_count = 0;
170                progress.split_completed_count = 0;
171                progress.split_assignment_generation = generation;
172                progress.splits.as_slice()
173            }
174            CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => {
175                static SINGLE_SPLIT: LazyLock<CdcTableSnapshotSplitRaw> =
176                    LazyLock::new(single_merged_split);
177                core::slice::from_ref(&*SINGLE_SPLIT)
178            }
179        };
180        assign_cdc_table_snapshot_splits(actor_ids, splits, generation)
181    }
182
183    pub fn gen_cdc_progress(&self) -> CdcProgress {
184        match &self.status {
185            CdcBackfillStatus::Backfilling(progress) => CdcProgress {
186                split_total_count: progress.splits.len() as _,
187                split_backfilled_count: progress.split_backfilled_count,
188                split_completed_count: progress.split_completed_count,
189            },
190            CdcBackfillStatus::PreCompleted | CdcBackfillStatus::Completed => CdcProgress {
191                split_total_count: 1,
192                split_backfilled_count: 1,
193                split_completed_count: 1,
194            },
195        }
196    }
197
198    pub fn take_pre_completed(&mut self) -> bool {
199        if let CdcBackfillStatus::PreCompleted = &self.status {
200            self.status = CdcBackfillStatus::Completed;
201            true
202        } else {
203            false
204        }
205    }
206}
207
208#[cfg(test)]
209mod test {
210
211    use risingwave_connector::source::CdcTableSnapshotSplitRaw;
212    use risingwave_pb::stream_service::barrier_complete_response::CdcTableBackfillProgress;
213
214    use crate::barrier::cdc_progress::{
215        CdcBackfillProgress, CdcBackfillStatus, CdcTableBackfillTracker,
216    };
217
218    impl CdcTableBackfillTracker {
219        fn progress(&self) -> &CdcBackfillProgress {
220            if let CdcBackfillStatus::Backfilling(progress) = &self.status {
221                progress
222            } else {
223                unreachable!()
224            }
225        }
226    }
227
228    #[tokio::test]
229    async fn test_generation() {
230        let split_count = 10u64;
231        let mut tracker = CdcTableBackfillTracker::new(
232            233.into(),
233            (0..split_count)
234                .map(|split_id| CdcTableSnapshotSplitRaw {
235                    split_id: split_id as _,
236                    left_bound_inclusive: vec![],
237                    right_bound_exclusive: vec![],
238                })
239                .collect(),
240        );
241        assert_eq!(tracker.next_generation, 2);
242        tracker
243            .reassign_splits([1.into()].into_iter().collect())
244            .unwrap();
245        let generation = tracker.progress().split_assignment_generation;
246        assert_eq!(generation, 2);
247        assert_init_state(&tracker, split_count);
248        let cdc_table_backfill_progress = vec![
249            CdcTableBackfillProgress {
250                done: true,
251                split_id_start_inclusive: 1,
252                split_id_end_inclusive: 2,
253                generation,
254                fragment_id: 12.into(),
255                ..Default::default()
256            },
257            CdcTableBackfillProgress {
258                done: true,
259                split_id_start_inclusive: 5,
260                split_id_end_inclusive: 10,
261                generation,
262                fragment_id: 11.into(),
263                ..Default::default()
264            },
265        ];
266        for progress in &cdc_table_backfill_progress {
267            tracker.update_split_progress(progress);
268        }
269        assert_eq!(tracker.progress().split_completed_count, 8);
270
271        // Reset generation.
272        tracker
273            .reassign_splits([1.into()].into_iter().collect())
274            .unwrap();
275        let generation = tracker.progress().split_assignment_generation;
276        assert_eq!(generation, 3);
277        assert_init_state(&tracker, split_count);
278        let cdc_table_backfill_progress = CdcTableBackfillProgress {
279            done: true,
280            split_id_start_inclusive: 3,
281            split_id_end_inclusive: 4,
282            // Expired generation.
283            generation: generation - 1,
284            fragment_id: 13.into(),
285            ..Default::default()
286        };
287        tracker.update_split_progress(&cdc_table_backfill_progress);
288        assert_init_state(&tracker, split_count);
289        assert_eq!(tracker.progress().split_completed_count, 0);
290
291        let cdc_table_backfill_progress = [
292            CdcTableBackfillProgress {
293                done: true,
294                split_id_start_inclusive: 1,
295                split_id_end_inclusive: 2,
296                generation,
297                fragment_id: 12.into(),
298                ..Default::default()
299            },
300            CdcTableBackfillProgress {
301                done: true,
302                split_id_start_inclusive: 5,
303                split_id_end_inclusive: 10,
304                generation,
305                fragment_id: 11.into(),
306                ..Default::default()
307            },
308            CdcTableBackfillProgress {
309                done: true,
310                split_id_start_inclusive: 3,
311                split_id_end_inclusive: 4,
312                generation,
313                fragment_id: 13.into(),
314                ..Default::default()
315            },
316        ];
317        for progress in &cdc_table_backfill_progress {
318            tracker.update_split_progress(progress);
319        }
320        assert!(tracker.take_pre_completed());
321    }
322
323    fn assert_init_state(tracker: &CdcTableBackfillTracker, split_count: u64) {
324        let CdcBackfillStatus::Backfilling(progress) = &tracker.status else {
325            unreachable!()
326        };
327        assert_eq!(progress.splits.len() as u64, split_count);
328        assert_eq!(progress.split_completed_count, 0);
329        assert_eq!(progress.split_backfilled_count, 0);
330    }
331}