1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::PartialGraphState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35 DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36 ConsumingLogStore { pending_epoch_lag: u64 },
37 DoneConsumingLogStore,
38}
39
40impl BackfillState {
41 pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42 let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43 BackfillState::ConsumingUpstreamTableOrSource(
44 consumed_epoch,
45 consumed_rows,
46 buffered_rows,
47 ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49 (true, 0, consumed_rows, 0, buffered_rows)
50 }
51 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52 (false, 0, 0, pending_epoch_lag, 0)
53 }
54 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55 };
56 PbCreateMviewProgress {
57 backfill_actor_id: actor_id,
58 done,
59 consumed_epoch,
60 consumed_rows,
61 pending_epoch_lag,
62 buffered_rows,
63 fragment_id,
64 }
65 }
66}
67
68impl Display for BackfillState {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 match self {
71 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72 write!(
73 f,
74 "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75 epoch, rows, buffered
76 )
77 }
78 BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79 write!(
80 f,
81 "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82 rows, buffered
83 )
84 }
85 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86 write!(
87 f,
88 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89 )
90 }
91 BackfillState::DoneConsumingLogStore => {
92 write!(f, "DoneConsumingLogStore")
93 }
94 }
95 }
96}
97
98impl PartialGraphState {
99 pub(crate) fn update_create_mview_progress(
100 &mut self,
101 epoch: EpochPair,
102 fragment_id: FragmentId,
103 actor: ActorId,
104 state: BackfillState,
105 ) {
106 if let Some((prev_fragment_id, _)) = self
107 .graph_state
108 .create_mview_progress
109 .entry(epoch.curr)
110 .or_default()
111 .insert(actor, (fragment_id, state))
112 {
113 assert_eq!(prev_fragment_id, fragment_id)
114 }
115 }
116
117 pub(crate) fn update_cdc_table_backfill_progress(
118 &mut self,
119 epoch: EpochPair,
120 actor: ActorId,
121 state: CdcTableBackfillState,
122 ) {
123 if let Some(actor_state) = self.actor_states.get(&actor)
124 && actor_state.inflight_barriers.contains(&epoch.prev)
125 {
126 self.graph_state
127 .cdc_table_backfill_progress
128 .entry(epoch.curr)
129 .or_default()
130 .insert(actor, state);
131 } else {
132 warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
133 }
134 }
135}
136
137impl LocalBarrierManager {
138 fn update_create_mview_progress(
139 &self,
140 epoch: EpochPair,
141 fragment_id: FragmentId,
142 actor: ActorId,
143 state: BackfillState,
144 ) {
145 self.send_event(ReportCreateProgress {
146 epoch,
147 fragment_id,
148 actor,
149 state,
150 })
151 }
152}
153
154pub struct CreateMviewProgressReporter {
185 barrier_manager: LocalBarrierManager,
186
187 fragment_id: FragmentId,
188
189 backfill_actor_id: ActorId,
191
192 state: Option<BackfillState>,
193}
194
195impl CreateMviewProgressReporter {
196 pub fn new(
197 barrier_manager: LocalBarrierManager,
198 fragment_id: FragmentId,
199 backfill_actor_id: ActorId,
200 ) -> Self {
201 Self {
202 barrier_manager,
203 fragment_id,
204 backfill_actor_id,
205 state: None,
206 }
207 }
208
209 #[cfg(test)]
210 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
211 Self::new(barrier_manager, 0.into(), 0.into())
212 }
213
214 pub fn actor_id(&self) -> ActorId {
215 self.backfill_actor_id
216 }
217
218 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
219 self.state = Some(state);
220 self.barrier_manager.update_create_mview_progress(
221 epoch,
222 self.fragment_id,
223 self.backfill_actor_id,
224 state,
225 );
226 }
227
228 pub fn update(
233 &mut self,
234 epoch: EpochPair,
235 consumed_epoch: ConsumedEpoch,
236 current_consumed_rows: ConsumedRows,
237 ) {
238 self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
239 }
240
241 pub fn update_with_buffered_rows(
244 &mut self,
245 epoch: EpochPair,
246 consumed_epoch: ConsumedEpoch,
247 current_consumed_rows: ConsumedRows,
248 buffered_rows: BufferedRows,
249 ) {
250 match self.state {
251 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
252 assert!(
253 last <= consumed_epoch,
254 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
255 last,
256 consumed_epoch
257 );
258 assert!(last_consumed_rows <= current_consumed_rows);
259 }
260 Some(state) => {
261 panic!(
262 "should not update consuming progress at invalid state: {:?}",
263 state
264 )
265 }
266 None => {}
267 };
268 tracing::debug!(
269 actor_id = %self.backfill_actor_id,
270 ?epoch,
271 consumed_epoch,
272 current_consumed_rows,
273 buffered_rows,
274 "progress update"
275 );
276 self.update_inner(
277 epoch,
278 BackfillState::ConsumingUpstreamTableOrSource(
279 consumed_epoch,
280 current_consumed_rows,
281 buffered_rows,
282 ),
283 );
284 }
285
286 pub fn update_for_source_backfill(
289 &mut self,
290 epoch: EpochPair,
291 current_consumed_rows: ConsumedRows,
292 ) {
293 match self.state {
294 Some(BackfillState::ConsumingUpstreamTableOrSource(
295 dummy_last_epoch,
296 _last_consumed_rows,
297 _,
298 )) => {
299 debug_assert_eq!(dummy_last_epoch, 0);
300 }
301 Some(state) => {
302 panic!(
303 "should not update consuming progress at invalid state: {:?}",
304 state
305 )
306 }
307 None => {}
308 };
309 self.update_inner(
310 epoch,
311 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
313 );
314 }
315
316 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
319 self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
320 }
321
322 pub fn finish_with_buffered_rows(
325 &mut self,
326 epoch: EpochPair,
327 current_consumed_rows: ConsumedRows,
328 buffered_rows: BufferedRows,
329 ) {
330 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
331 return;
332 }
333 tracing::debug!(
334 actor_id = %self.backfill_actor_id,
335 ?epoch,
336 current_consumed_rows,
337 buffered_rows,
338 "progress finish"
339 );
340 self.update_inner(
342 epoch,
343 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
344 );
345 }
346
347 pub(crate) fn update_create_mview_log_store_progress(
348 &mut self,
349 epoch: EpochPair,
350 pending_epoch_lag: u64,
351 ) {
352 assert_matches!(
353 self.state,
354 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
355 | Some(BackfillState::ConsumingLogStore { .. })
356 | None,
357 "cannot update log store progress at state {:?}",
358 self.state
359 );
360 self.update_inner(
361 epoch,
362 BackfillState::ConsumingLogStore { pending_epoch_lag },
363 );
364 }
365
366 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
367 assert_matches!(
368 self.state,
369 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
370 | Some(BackfillState::ConsumingLogStore { .. })
371 | None,
372 "cannot finish log store progress at state {:?}",
373 self.state
374 );
375 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
376 }
377}
378
379impl LocalBarrierManager {
380 pub(crate) fn register_create_mview_progress(
388 &self,
389 actor_ctx: &ActorContext,
390 ) -> CreateMviewProgressReporter {
391 let fragment_id = actor_ctx.fragment_id;
392 let backfill_actor_id = actor_ctx.id;
393 trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
394 CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
395 }
396}