1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35 DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36 ConsumingLogStore { pending_epoch_lag: u64 },
37 DoneConsumingLogStore,
38}
39
40impl BackfillState {
41 pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42 let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43 BackfillState::ConsumingUpstreamTableOrSource(
44 consumed_epoch,
45 consumed_rows,
46 buffered_rows,
47 ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49 (true, 0, consumed_rows, 0, buffered_rows)
50 }
51 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52 (false, 0, 0, pending_epoch_lag, 0)
53 }
54 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55 };
56 PbCreateMviewProgress {
57 backfill_actor_id: actor_id,
58 done,
59 consumed_epoch,
60 consumed_rows,
61 pending_epoch_lag,
62 buffered_rows,
63 fragment_id,
64 }
65 }
66}
67
68impl Display for BackfillState {
69 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70 match self {
71 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72 write!(
73 f,
74 "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75 epoch, rows, buffered
76 )
77 }
78 BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79 write!(
80 f,
81 "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82 rows, buffered
83 )
84 }
85 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86 write!(
87 f,
88 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89 )
90 }
91 BackfillState::DoneConsumingLogStore => {
92 write!(f, "DoneConsumingLogStore")
93 }
94 }
95 }
96}
97
98impl DatabaseManagedBarrierState {
99 pub(crate) fn update_create_mview_progress(
100 &mut self,
101 epoch: EpochPair,
102 fragment_id: FragmentId,
103 actor: ActorId,
104 state: BackfillState,
105 ) {
106 if let Some(actor_state) = self.actor_states.get(&actor)
107 && let Some(graph_state) = self.graph_states.get_mut(&actor_state.partial_graph_id)
108 {
109 if let Some((prev_fragment_id, _)) = graph_state
110 .create_mview_progress
111 .entry(epoch.curr)
112 .or_default()
113 .insert(actor, (fragment_id, state))
114 {
115 assert_eq!(prev_fragment_id, fragment_id)
116 }
117 } else {
118 warn!(?epoch, %actor, ?state, "ignore create mview progress");
119 }
120 }
121
122 pub(crate) fn update_cdc_table_backfill_progress(
123 &mut self,
124 epoch: EpochPair,
125 actor: ActorId,
126 state: CdcTableBackfillState,
127 ) {
128 if let Some(actor_state) = self.actor_states.get(&actor)
129 && let Some(graph_state) = self.graph_states.get_mut(&actor_state.partial_graph_id)
130 {
131 graph_state
132 .cdc_table_backfill_progress
133 .entry(epoch.curr)
134 .or_default()
135 .insert(actor, state);
136 } else {
137 warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
138 }
139 }
140}
141
142impl LocalBarrierManager {
143 fn update_create_mview_progress(
144 &self,
145 epoch: EpochPair,
146 fragment_id: FragmentId,
147 actor: ActorId,
148 state: BackfillState,
149 ) {
150 self.send_event(ReportCreateProgress {
151 epoch,
152 fragment_id,
153 actor,
154 state,
155 })
156 }
157}
158
159pub struct CreateMviewProgressReporter {
190 barrier_manager: LocalBarrierManager,
191
192 fragment_id: FragmentId,
193
194 backfill_actor_id: ActorId,
196
197 state: Option<BackfillState>,
198}
199
200impl CreateMviewProgressReporter {
201 pub fn new(
202 barrier_manager: LocalBarrierManager,
203 fragment_id: FragmentId,
204 backfill_actor_id: ActorId,
205 ) -> Self {
206 Self {
207 barrier_manager,
208 fragment_id,
209 backfill_actor_id,
210 state: None,
211 }
212 }
213
214 #[cfg(test)]
215 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
216 Self::new(barrier_manager, 0.into(), 0.into())
217 }
218
219 pub fn actor_id(&self) -> ActorId {
220 self.backfill_actor_id
221 }
222
223 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
224 self.state = Some(state);
225 self.barrier_manager.update_create_mview_progress(
226 epoch,
227 self.fragment_id,
228 self.backfill_actor_id,
229 state,
230 );
231 }
232
233 pub fn update(
238 &mut self,
239 epoch: EpochPair,
240 consumed_epoch: ConsumedEpoch,
241 current_consumed_rows: ConsumedRows,
242 ) {
243 self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
244 }
245
246 pub fn update_with_buffered_rows(
249 &mut self,
250 epoch: EpochPair,
251 consumed_epoch: ConsumedEpoch,
252 current_consumed_rows: ConsumedRows,
253 buffered_rows: BufferedRows,
254 ) {
255 match self.state {
256 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
257 assert!(
258 last <= consumed_epoch,
259 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
260 last,
261 consumed_epoch
262 );
263 assert!(last_consumed_rows <= current_consumed_rows);
264 }
265 Some(state) => {
266 panic!(
267 "should not update consuming progress at invalid state: {:?}",
268 state
269 )
270 }
271 None => {}
272 };
273 tracing::debug!(
274 actor_id = %self.backfill_actor_id,
275 ?epoch,
276 consumed_epoch,
277 current_consumed_rows,
278 buffered_rows,
279 "progress update"
280 );
281 self.update_inner(
282 epoch,
283 BackfillState::ConsumingUpstreamTableOrSource(
284 consumed_epoch,
285 current_consumed_rows,
286 buffered_rows,
287 ),
288 );
289 }
290
291 pub fn update_for_source_backfill(
294 &mut self,
295 epoch: EpochPair,
296 current_consumed_rows: ConsumedRows,
297 ) {
298 match self.state {
299 Some(BackfillState::ConsumingUpstreamTableOrSource(
300 dummy_last_epoch,
301 _last_consumed_rows,
302 _,
303 )) => {
304 debug_assert_eq!(dummy_last_epoch, 0);
305 }
306 Some(state) => {
307 panic!(
308 "should not update consuming progress at invalid state: {:?}",
309 state
310 )
311 }
312 None => {}
313 };
314 self.update_inner(
315 epoch,
316 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
318 );
319 }
320
321 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
324 self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
325 }
326
327 pub fn finish_with_buffered_rows(
330 &mut self,
331 epoch: EpochPair,
332 current_consumed_rows: ConsumedRows,
333 buffered_rows: BufferedRows,
334 ) {
335 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
336 return;
337 }
338 tracing::debug!(
339 actor_id = %self.backfill_actor_id,
340 ?epoch,
341 current_consumed_rows,
342 buffered_rows,
343 "progress finish"
344 );
345 self.update_inner(
347 epoch,
348 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
349 );
350 }
351
352 pub(crate) fn update_create_mview_log_store_progress(
353 &mut self,
354 epoch: EpochPair,
355 pending_epoch_lag: u64,
356 ) {
357 assert_matches!(
358 self.state,
359 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
360 | Some(BackfillState::ConsumingLogStore { .. })
361 | None,
362 "cannot update log store progress at state {:?}",
363 self.state
364 );
365 self.update_inner(
366 epoch,
367 BackfillState::ConsumingLogStore { pending_epoch_lag },
368 );
369 }
370
371 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
372 assert_matches!(
373 self.state,
374 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
375 | Some(BackfillState::ConsumingLogStore { .. })
376 | None,
377 "cannot finish log store progress at state {:?}",
378 self.state
379 );
380 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
381 }
382}
383
384impl LocalBarrierManager {
385 pub(crate) fn register_create_mview_progress(
393 &self,
394 actor_ctx: &ActorContext,
395 ) -> CreateMviewProgressReporter {
396 let fragment_id = actor_ctx.fragment_id;
397 let backfill_actor_id = actor_ctx.id;
398 trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
399 CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
400 }
401}