risingwave_stream/task/barrier_manager/
cdc_progress.rs1use risingwave_common::util::epoch::EpochPair;
16use risingwave_pb::stream_service::barrier_complete_response::PbCdcTableBackfillProgress;
17
18use crate::task::barrier_manager::LocalBarrierEvent::ReportCdcTableBackfillProgress;
19use crate::task::{ActorId, FragmentId, LocalBarrierManager};
20
21#[derive(Debug, Clone, Copy)]
22pub(crate) enum CdcTableBackfillState {
23 Update {
24 fragment_id: FragmentId,
25 split_id_start_inclusive: i64,
26 split_id_end_inclusive: i64,
27 generation: u64,
28 },
29 Finish {
30 fragment_id: FragmentId,
31 split_id_start_inclusive: i64,
32 split_id_end_inclusive: i64,
33 generation: u64,
34 },
35}
36
37impl CdcTableBackfillState {
38 pub fn to_pb(self, actor_id: ActorId, epoch: u64) -> PbCdcTableBackfillProgress {
39 match self {
40 CdcTableBackfillState::Update {
41 fragment_id,
42 split_id_start_inclusive,
43 split_id_end_inclusive,
44 generation,
45 } => PbCdcTableBackfillProgress {
46 actor_id,
47 epoch,
48 done: false,
49 split_id_start_inclusive,
50 split_id_end_inclusive,
51 generation,
52 fragment_id,
53 },
54 CdcTableBackfillState::Finish {
55 fragment_id,
56 split_id_start_inclusive,
57 split_id_end_inclusive,
58 generation,
59 } => PbCdcTableBackfillProgress {
60 actor_id,
61 epoch,
62 done: true,
63 split_id_start_inclusive,
64 split_id_end_inclusive,
65 generation,
66 fragment_id,
67 },
68 }
69 }
70}
71
72pub struct CdcProgressReporter {
73 barrier_manager: LocalBarrierManager,
74}
75
76impl CdcProgressReporter {
77 pub fn new(barrier_manager: LocalBarrierManager) -> Self {
78 Self { barrier_manager }
79 }
80
81 pub fn update(
82 &self,
83 fragment_id: FragmentId,
84 actor_id: ActorId,
85 epoch: EpochPair,
86 generation: u64,
87 split_id_range: (i64, i64),
88 ) {
89 self.barrier_manager.update_cdc_backfill_progress(
90 actor_id,
91 epoch,
92 CdcTableBackfillState::Update {
93 fragment_id,
94 split_id_start_inclusive: split_id_range.0,
95 split_id_end_inclusive: split_id_range.1,
96 generation,
97 },
98 );
99 }
100
101 pub fn finish(
102 &self,
103 fragment_id: FragmentId,
104 actor_id: ActorId,
105 epoch: EpochPair,
106 generation: u64,
107 split_id_range: (i64, i64),
108 ) {
109 self.barrier_manager.update_cdc_backfill_progress(
110 actor_id,
111 epoch,
112 CdcTableBackfillState::Finish {
113 fragment_id,
114 split_id_start_inclusive: split_id_range.0,
115 split_id_end_inclusive: split_id_range.1,
116 generation,
117 },
118 );
119 }
120}
121
122impl LocalBarrierManager {
123 fn update_cdc_backfill_progress(
124 &self,
125 actor_id: ActorId,
126 epoch: EpochPair,
127 state: CdcTableBackfillState,
128 ) {
129 self.send_event(ReportCdcTableBackfillProgress {
130 actor_id,
131 epoch,
132 state,
133 })
134 }
135}