risingwave_stream/task/barrier_manager/
cdc_progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::epoch::EpochPair;
16use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
17
18use crate::task::barrier_manager::LocalBarrierEvent::ReportCdcTableBackfillProgress;
19use crate::task::{ActorId, FragmentId, LocalBarrierManager};
20
21#[derive(Debug, Clone, Copy)]
22pub(crate) enum CdcTableBackfillState {
23    Update {
24        fragment_id: FragmentId,
25        split_id_start_inclusive: i64,
26        split_id_end_inclusive: i64,
27        generation: u64,
28    },
29    Finish {
30        fragment_id: FragmentId,
31        split_id_start_inclusive: i64,
32        split_id_end_inclusive: i64,
33        generation: u64,
34    },
35}
36
37impl CdcTableBackfillState {
38    pub fn to_pb(self, actor_id: ActorId, epoch: u64) -> PbCdcTableBackfillProgress {
39        match self {
40            CdcTableBackfillState::Update {
41                fragment_id,
42                split_id_start_inclusive,
43                split_id_end_inclusive,
44                generation,
45            } => PbCdcTableBackfillProgress {
46                actor_id,
47                epoch,
48                done: false,
49                split_id_start_inclusive,
50                split_id_end_inclusive,
51                generation,
52                fragment_id,
53            },
54            CdcTableBackfillState::Finish {
55                fragment_id,
56                split_id_start_inclusive,
57                split_id_end_inclusive,
58                generation,
59            } => PbCdcTableBackfillProgress {
60                actor_id,
61                epoch,
62                done: true,
63                split_id_start_inclusive,
64                split_id_end_inclusive,
65                generation,
66                fragment_id,
67            },
68        }
69    }
70}
71
72pub struct CdcProgressReporter {
73    barrier_manager: LocalBarrierManager,
74}
75
76impl CdcProgressReporter {
77    pub fn new(barrier_manager: LocalBarrierManager) -> Self {
78        Self { barrier_manager }
79    }
80
81    pub fn update(
82        &self,
83        fragment_id: FragmentId,
84        actor_id: ActorId,
85        epoch: EpochPair,
86        generation: u64,
87        split_id_range: (i64, i64),
88    ) {
89        self.barrier_manager.update_cdc_backfill_progress(
90            actor_id,
91            epoch,
92            CdcTableBackfillState::Update {
93                fragment_id,
94                split_id_start_inclusive: split_id_range.0,
95                split_id_end_inclusive: split_id_range.1,
96                generation,
97            },
98        );
99    }
100
101    pub fn finish(
102        &self,
103        fragment_id: FragmentId,
104        actor_id: ActorId,
105        epoch: EpochPair,
106        generation: u64,
107        split_id_range: (i64, i64),
108    ) {
109        self.barrier_manager.update_cdc_backfill_progress(
110            actor_id,
111            epoch,
112            CdcTableBackfillState::Finish {
113                fragment_id,
114                split_id_start_inclusive: split_id_range.0,
115                split_id_end_inclusive: split_id_range.1,
116                generation,
117            },
118        );
119    }
120}
121
122impl LocalBarrierManager {
123    fn update_cdc_backfill_progress(
124        &self,
125        actor_id: ActorId,
126        epoch: EpochPair,
127        state: CdcTableBackfillState,
128    ) {
129        self.send_event(ReportCdcTableBackfillProgress {
130            actor_id,
131            epoch,
132            state,
133        })
134    }
135}