1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35 DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36 ConsumingLogStore { pending_epoch_lag: u64 },
37 DoneConsumingLogStore,
38}
39
40impl BackfillState {
41 pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42 let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43 BackfillState::ConsumingUpstreamTableOrSource(
44 consumed_epoch,
45 consumed_rows,
46 buffered_rows,
47 ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49 (true, 0, consumed_rows, 0, buffered_rows)
50 }
51 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52 (false, 0, 0, pending_epoch_lag, 0)
53 }
54 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55 };
56 PbCreateMviewProgress {
57 backfill_actor_id: actor_id,
58 done,
59 consumed_epoch,
60 consumed_rows,
61 pending_epoch_lag,
62 buffered_rows,
63 fragment_id,
64 }
65 }
66}
67
68impl Display for BackfillState {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 match self {
71 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72 write!(
73 f,
74 "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75 epoch, rows, buffered
76 )
77 }
78 BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79 write!(
80 f,
81 "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82 rows, buffered
83 )
84 }
85 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86 write!(
87 f,
88 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89 )
90 }
91 BackfillState::DoneConsumingLogStore => {
92 write!(f, "DoneConsumingLogStore")
93 }
94 }
95 }
96}
97
98impl DatabaseManagedBarrierState {
99 pub(crate) fn update_create_mview_progress(
100 &mut self,
101 epoch: EpochPair,
102 fragment_id: FragmentId,
103 actor: ActorId,
104 state: BackfillState,
105 ) {
106 if let Some(actor_state) = self.actor_states.get(&actor)
107 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
108 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
109 {
110 if let Some((prev_fragment_id, _)) = graph_state
111 .create_mview_progress
112 .entry(epoch.curr)
113 .or_default()
114 .insert(actor, (fragment_id, state))
115 {
116 assert_eq!(prev_fragment_id, fragment_id)
117 }
118 } else {
119 warn!(?epoch, %actor, ?state, "ignore create mview progress");
120 }
121 }
122
123 pub(crate) fn update_cdc_table_backfill_progress(
124 &mut self,
125 epoch: EpochPair,
126 actor: ActorId,
127 state: CdcTableBackfillState,
128 ) {
129 if let Some(actor_state) = self.actor_states.get(&actor)
130 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
131 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
132 {
133 graph_state
134 .cdc_table_backfill_progress
135 .entry(epoch.curr)
136 .or_default()
137 .insert(actor, state);
138 } else {
139 warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
140 }
141 }
142}
143
144impl LocalBarrierManager {
145 fn update_create_mview_progress(
146 &self,
147 epoch: EpochPair,
148 fragment_id: FragmentId,
149 actor: ActorId,
150 state: BackfillState,
151 ) {
152 self.send_event(ReportCreateProgress {
153 epoch,
154 fragment_id,
155 actor,
156 state,
157 })
158 }
159}
160
161pub struct CreateMviewProgressReporter {
192 barrier_manager: LocalBarrierManager,
193
194 fragment_id: FragmentId,
195
196 backfill_actor_id: ActorId,
198
199 state: Option<BackfillState>,
200}
201
202impl CreateMviewProgressReporter {
203 pub fn new(
204 barrier_manager: LocalBarrierManager,
205 fragment_id: FragmentId,
206 backfill_actor_id: ActorId,
207 ) -> Self {
208 Self {
209 barrier_manager,
210 fragment_id,
211 backfill_actor_id,
212 state: None,
213 }
214 }
215
216 #[cfg(test)]
217 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
218 Self::new(barrier_manager, 0.into(), 0.into())
219 }
220
221 pub fn actor_id(&self) -> ActorId {
222 self.backfill_actor_id
223 }
224
225 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
226 self.state = Some(state);
227 self.barrier_manager.update_create_mview_progress(
228 epoch,
229 self.fragment_id,
230 self.backfill_actor_id,
231 state,
232 );
233 }
234
235 pub fn update(
240 &mut self,
241 epoch: EpochPair,
242 consumed_epoch: ConsumedEpoch,
243 current_consumed_rows: ConsumedRows,
244 ) {
245 self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
246 }
247
248 pub fn update_with_buffered_rows(
251 &mut self,
252 epoch: EpochPair,
253 consumed_epoch: ConsumedEpoch,
254 current_consumed_rows: ConsumedRows,
255 buffered_rows: BufferedRows,
256 ) {
257 match self.state {
258 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
259 assert!(
260 last <= consumed_epoch,
261 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
262 last,
263 consumed_epoch
264 );
265 assert!(last_consumed_rows <= current_consumed_rows);
266 }
267 Some(state) => {
268 panic!(
269 "should not update consuming progress at invalid state: {:?}",
270 state
271 )
272 }
273 None => {}
274 };
275 tracing::debug!(
276 actor_id = %self.backfill_actor_id,
277 ?epoch,
278 consumed_epoch,
279 current_consumed_rows,
280 buffered_rows,
281 "progress update"
282 );
283 self.update_inner(
284 epoch,
285 BackfillState::ConsumingUpstreamTableOrSource(
286 consumed_epoch,
287 current_consumed_rows,
288 buffered_rows,
289 ),
290 );
291 }
292
293 pub fn update_for_source_backfill(
296 &mut self,
297 epoch: EpochPair,
298 current_consumed_rows: ConsumedRows,
299 ) {
300 match self.state {
301 Some(BackfillState::ConsumingUpstreamTableOrSource(
302 dummy_last_epoch,
303 _last_consumed_rows,
304 _,
305 )) => {
306 debug_assert_eq!(dummy_last_epoch, 0);
307 }
308 Some(state) => {
309 panic!(
310 "should not update consuming progress at invalid state: {:?}",
311 state
312 )
313 }
314 None => {}
315 };
316 self.update_inner(
317 epoch,
318 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
320 );
321 }
322
323 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
326 self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
327 }
328
329 pub fn finish_with_buffered_rows(
332 &mut self,
333 epoch: EpochPair,
334 current_consumed_rows: ConsumedRows,
335 buffered_rows: BufferedRows,
336 ) {
337 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
338 return;
339 }
340 tracing::debug!(
341 actor_id = %self.backfill_actor_id,
342 ?epoch,
343 current_consumed_rows,
344 buffered_rows,
345 "progress finish"
346 );
347 self.update_inner(
349 epoch,
350 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
351 );
352 }
353
354 pub(crate) fn update_create_mview_log_store_progress(
355 &mut self,
356 epoch: EpochPair,
357 pending_epoch_lag: u64,
358 ) {
359 assert_matches!(
360 self.state,
361 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
362 | Some(BackfillState::ConsumingLogStore { .. })
363 | None,
364 "cannot update log store progress at state {:?}",
365 self.state
366 );
367 self.update_inner(
368 epoch,
369 BackfillState::ConsumingLogStore { pending_epoch_lag },
370 );
371 }
372
373 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
374 assert_matches!(
375 self.state,
376 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
377 | Some(BackfillState::ConsumingLogStore { .. })
378 | None,
379 "cannot finish log store progress at state {:?}",
380 self.state
381 );
382 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
383 }
384}
385
386impl LocalBarrierManager {
387 pub(crate) fn register_create_mview_progress(
395 &self,
396 actor_ctx: &ActorContext,
397 ) -> CreateMviewProgressReporter {
398 let fragment_id = actor_ctx.fragment_id;
399 let backfill_actor_id = actor_ctx.id;
400 trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
401 CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
402 }
403}