1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::cdc_progress::CdcTableBackfillState;
24use crate::task::{ActorId, LocalBarrierManager};
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28type BufferedRows = u64;
29
30#[derive(Debug, Clone, Copy)]
31pub(crate) enum BackfillState {
32    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
33    DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
34    ConsumingLogStore { pending_epoch_lag: u64 },
35    DoneConsumingLogStore,
36}
37
38impl BackfillState {
39    pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
40        let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
41            BackfillState::ConsumingUpstreamTableOrSource(
42                consumed_epoch,
43                consumed_rows,
44                buffered_rows,
45            ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
46            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
47                (true, 0, consumed_rows, 0, buffered_rows)
48            }
49            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
50                (false, 0, 0, pending_epoch_lag, 0)
51            }
52            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
53        };
54        PbCreateMviewProgress {
55            backfill_actor_id: actor_id,
56            done,
57            consumed_epoch,
58            consumed_rows,
59            pending_epoch_lag,
60            buffered_rows,
61        }
62    }
63}
64
65impl Display for BackfillState {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        match self {
68            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
69                write!(
70                    f,
71                    "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
72                    epoch, rows, buffered
73                )
74            }
75            BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
76                write!(
77                    f,
78                    "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
79                    rows, buffered
80                )
81            }
82            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
83                write!(
84                    f,
85                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
86                )
87            }
88            BackfillState::DoneConsumingLogStore => {
89                write!(f, "DoneConsumingLogStore")
90            }
91        }
92    }
93}
94
95impl DatabaseManagedBarrierState {
96    pub(crate) fn update_create_mview_progress(
97        &mut self,
98        epoch: EpochPair,
99        actor: ActorId,
100        state: BackfillState,
101    ) {
102        if let Some(actor_state) = self.actor_states.get(&actor)
103            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
104            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
105        {
106            graph_state
107                .create_mview_progress
108                .entry(epoch.curr)
109                .or_default()
110                .insert(actor, state);
111        } else {
112            warn!(?epoch, actor, ?state, "ignore create mview progress");
113        }
114    }
115
116    pub(crate) fn update_cdc_table_backfill_progress(
117        &mut self,
118        epoch: EpochPair,
119        actor: ActorId,
120        state: CdcTableBackfillState,
121    ) {
122        if let Some(actor_state) = self.actor_states.get(&actor)
123            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
124            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
125        {
126            graph_state
127                .cdc_table_backfill_progress
128                .entry(epoch.curr)
129                .or_default()
130                .insert(actor, state);
131        } else {
132            warn!(?epoch, actor, ?state, "ignore CDC table backfill progress");
133        }
134    }
135}
136
137impl LocalBarrierManager {
138    fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
139        self.send_event(ReportCreateProgress {
140            epoch,
141            actor,
142            state,
143        })
144    }
145}
146
147pub struct CreateMviewProgressReporter {
178    barrier_manager: LocalBarrierManager,
179
180    backfill_actor_id: ActorId,
182
183    state: Option<BackfillState>,
184}
185
186impl CreateMviewProgressReporter {
187    pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
188        Self {
189            barrier_manager,
190            backfill_actor_id,
191            state: None,
192        }
193    }
194
195    #[cfg(test)]
196    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
197        Self::new(barrier_manager, 0)
198    }
199
200    pub fn actor_id(&self) -> u32 {
201        self.backfill_actor_id
202    }
203
204    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
205        self.state = Some(state);
206        self.barrier_manager
207            .update_create_mview_progress(epoch, self.backfill_actor_id, state);
208    }
209
210    pub fn update(
215        &mut self,
216        epoch: EpochPair,
217        consumed_epoch: ConsumedEpoch,
218        current_consumed_rows: ConsumedRows,
219    ) {
220        self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
221    }
222
223    pub fn update_with_buffered_rows(
226        &mut self,
227        epoch: EpochPair,
228        consumed_epoch: ConsumedEpoch,
229        current_consumed_rows: ConsumedRows,
230        buffered_rows: BufferedRows,
231    ) {
232        match self.state {
233            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
234                assert!(
235                    last <= consumed_epoch,
236                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
237                    last,
238                    consumed_epoch
239                );
240                assert!(last_consumed_rows <= current_consumed_rows);
241            }
242            Some(state) => {
243                panic!(
244                    "should not update consuming progress at invalid state: {:?}",
245                    state
246                )
247            }
248            None => {}
249        };
250        tracing::debug!(
251            actor_id = self.backfill_actor_id,
252            ?epoch,
253            consumed_epoch,
254            current_consumed_rows,
255            buffered_rows,
256            "progress update"
257        );
258        self.update_inner(
259            epoch,
260            BackfillState::ConsumingUpstreamTableOrSource(
261                consumed_epoch,
262                current_consumed_rows,
263                buffered_rows,
264            ),
265        );
266    }
267
268    pub fn update_for_source_backfill(
271        &mut self,
272        epoch: EpochPair,
273        current_consumed_rows: ConsumedRows,
274    ) {
275        match self.state {
276            Some(BackfillState::ConsumingUpstreamTableOrSource(
277                dummy_last_epoch,
278                _last_consumed_rows,
279                _,
280            )) => {
281                debug_assert_eq!(dummy_last_epoch, 0);
282            }
283            Some(state) => {
284                panic!(
285                    "should not update consuming progress at invalid state: {:?}",
286                    state
287                )
288            }
289            None => {}
290        };
291        self.update_inner(
292            epoch,
293            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
295        );
296    }
297
298    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
301        self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
302    }
303
304    pub fn finish_with_buffered_rows(
307        &mut self,
308        epoch: EpochPair,
309        current_consumed_rows: ConsumedRows,
310        buffered_rows: BufferedRows,
311    ) {
312        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
313            return;
314        }
315        tracing::debug!(
316            actor_id = self.backfill_actor_id,
317            ?epoch,
318            current_consumed_rows,
319            buffered_rows,
320            "progress finish"
321        );
322        self.update_inner(
324            epoch,
325            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
326        );
327    }
328
329    pub(crate) fn update_create_mview_log_store_progress(
330        &mut self,
331        epoch: EpochPair,
332        pending_epoch_lag: u64,
333    ) {
334        assert_matches!(
335            self.state,
336            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
337                | Some(BackfillState::ConsumingLogStore { .. })
338                | None,
339            "cannot update log store progress at state {:?}",
340            self.state
341        );
342        self.update_inner(
343            epoch,
344            BackfillState::ConsumingLogStore { pending_epoch_lag },
345        );
346    }
347
348    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
349        assert_matches!(
350            self.state,
351            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
352                | Some(BackfillState::ConsumingLogStore { .. })
353                | None,
354            "cannot finish log store progress at state {:?}",
355            self.state
356        );
357        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
358    }
359}
360
361impl LocalBarrierManager {
362    pub(crate) fn register_create_mview_progress(
370        &self,
371        backfill_actor_id: ActorId,
372    ) -> CreateMviewProgressReporter {
373        trace!("register create mview progress: {}", backfill_actor_id);
374        CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
375    }
376}