1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::cdc_progress::CdcTableBackfillState;
24use crate::task::{ActorId, LocalBarrierManager};
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28type BufferedRows = u64;
29
30#[derive(Debug, Clone, Copy)]
31pub(crate) enum BackfillState {
32 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
33 DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
34 ConsumingLogStore { pending_epoch_lag: u64 },
35 DoneConsumingLogStore,
36}
37
38impl BackfillState {
39 pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
40 let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
41 BackfillState::ConsumingUpstreamTableOrSource(
42 consumed_epoch,
43 consumed_rows,
44 buffered_rows,
45 ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
46 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
47 (true, 0, consumed_rows, 0, buffered_rows)
48 }
49 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
50 (false, 0, 0, pending_epoch_lag, 0)
51 }
52 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
53 };
54 PbCreateMviewProgress {
55 backfill_actor_id: actor_id,
56 done,
57 consumed_epoch,
58 consumed_rows,
59 pending_epoch_lag,
60 buffered_rows,
61 }
62 }
63}
64
65impl Display for BackfillState {
66 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67 match self {
68 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
69 write!(
70 f,
71 "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
72 epoch, rows, buffered
73 )
74 }
75 BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
76 write!(
77 f,
78 "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
79 rows, buffered
80 )
81 }
82 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
83 write!(
84 f,
85 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
86 )
87 }
88 BackfillState::DoneConsumingLogStore => {
89 write!(f, "DoneConsumingLogStore")
90 }
91 }
92 }
93}
94
95impl DatabaseManagedBarrierState {
96 pub(crate) fn update_create_mview_progress(
97 &mut self,
98 epoch: EpochPair,
99 actor: ActorId,
100 state: BackfillState,
101 ) {
102 if let Some(actor_state) = self.actor_states.get(&actor)
103 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
104 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
105 {
106 graph_state
107 .create_mview_progress
108 .entry(epoch.curr)
109 .or_default()
110 .insert(actor, state);
111 } else {
112 warn!(?epoch, actor, ?state, "ignore create mview progress");
113 }
114 }
115
116 pub(crate) fn update_cdc_table_backfill_progress(
117 &mut self,
118 epoch: EpochPair,
119 actor: ActorId,
120 state: CdcTableBackfillState,
121 ) {
122 if let Some(actor_state) = self.actor_states.get(&actor)
123 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
124 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
125 {
126 graph_state
127 .cdc_table_backfill_progress
128 .entry(epoch.curr)
129 .or_default()
130 .insert(actor, state);
131 } else {
132 warn!(?epoch, actor, ?state, "ignore CDC table backfill progress");
133 }
134 }
135}
136
137impl LocalBarrierManager {
138 fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
139 self.send_event(ReportCreateProgress {
140 epoch,
141 actor,
142 state,
143 })
144 }
145}
146
147pub struct CreateMviewProgressReporter {
178 barrier_manager: LocalBarrierManager,
179
180 backfill_actor_id: ActorId,
182
183 state: Option<BackfillState>,
184}
185
186impl CreateMviewProgressReporter {
187 pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
188 Self {
189 barrier_manager,
190 backfill_actor_id,
191 state: None,
192 }
193 }
194
195 #[cfg(test)]
196 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
197 Self::new(barrier_manager, 0)
198 }
199
200 pub fn actor_id(&self) -> u32 {
201 self.backfill_actor_id
202 }
203
204 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
205 self.state = Some(state);
206 self.barrier_manager
207 .update_create_mview_progress(epoch, self.backfill_actor_id, state);
208 }
209
210 pub fn update(
215 &mut self,
216 epoch: EpochPair,
217 consumed_epoch: ConsumedEpoch,
218 current_consumed_rows: ConsumedRows,
219 ) {
220 self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
221 }
222
223 pub fn update_with_buffered_rows(
226 &mut self,
227 epoch: EpochPair,
228 consumed_epoch: ConsumedEpoch,
229 current_consumed_rows: ConsumedRows,
230 buffered_rows: BufferedRows,
231 ) {
232 match self.state {
233 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
234 assert!(
235 last <= consumed_epoch,
236 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
237 last,
238 consumed_epoch
239 );
240 assert!(last_consumed_rows <= current_consumed_rows);
241 }
242 Some(state) => {
243 panic!(
244 "should not update consuming progress at invalid state: {:?}",
245 state
246 )
247 }
248 None => {}
249 };
250 tracing::debug!(
251 actor_id = self.backfill_actor_id,
252 ?epoch,
253 consumed_epoch,
254 current_consumed_rows,
255 buffered_rows,
256 "progress update"
257 );
258 self.update_inner(
259 epoch,
260 BackfillState::ConsumingUpstreamTableOrSource(
261 consumed_epoch,
262 current_consumed_rows,
263 buffered_rows,
264 ),
265 );
266 }
267
268 pub fn update_for_source_backfill(
271 &mut self,
272 epoch: EpochPair,
273 current_consumed_rows: ConsumedRows,
274 ) {
275 match self.state {
276 Some(BackfillState::ConsumingUpstreamTableOrSource(
277 dummy_last_epoch,
278 _last_consumed_rows,
279 _,
280 )) => {
281 debug_assert_eq!(dummy_last_epoch, 0);
282 }
283 Some(state) => {
284 panic!(
285 "should not update consuming progress at invalid state: {:?}",
286 state
287 )
288 }
289 None => {}
290 };
291 self.update_inner(
292 epoch,
293 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
295 );
296 }
297
298 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
301 self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
302 }
303
304 pub fn finish_with_buffered_rows(
307 &mut self,
308 epoch: EpochPair,
309 current_consumed_rows: ConsumedRows,
310 buffered_rows: BufferedRows,
311 ) {
312 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
313 return;
314 }
315 tracing::debug!(
316 actor_id = self.backfill_actor_id,
317 ?epoch,
318 current_consumed_rows,
319 buffered_rows,
320 "progress finish"
321 );
322 self.update_inner(
324 epoch,
325 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
326 );
327 }
328
329 pub(crate) fn update_create_mview_log_store_progress(
330 &mut self,
331 epoch: EpochPair,
332 pending_epoch_lag: u64,
333 ) {
334 assert_matches!(
335 self.state,
336 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
337 | Some(BackfillState::ConsumingLogStore { .. })
338 | None,
339 "cannot update log store progress at state {:?}",
340 self.state
341 );
342 self.update_inner(
343 epoch,
344 BackfillState::ConsumingLogStore { pending_epoch_lag },
345 );
346 }
347
348 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
349 assert_matches!(
350 self.state,
351 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
352 | Some(BackfillState::ConsumingLogStore { .. })
353 | None,
354 "cannot finish log store progress at state {:?}",
355 self.state
356 );
357 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
358 }
359}
360
361impl LocalBarrierManager {
362 pub(crate) fn register_create_mview_progress(
370 &self,
371 backfill_actor_id: ActorId,
372 ) -> CreateMviewProgressReporter {
373 trace!("register create mview progress: {}", backfill_actor_id);
374 CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
375 }
376}