risingwave_stream/task/barrier_manager/
progress.rs1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::cdc_progress::CdcTableBackfillState;
24use crate::task::{ActorId, LocalBarrierManager};
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28
29#[derive(Debug, Clone, Copy)]
30pub(crate) enum BackfillState {
31 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
32 DoneConsumingUpstreamTableOrSource(ConsumedRows),
33 ConsumingLogStore { pending_epoch_lag: u64 },
34 DoneConsumingLogStore,
35}
36
37impl BackfillState {
38 pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
39 let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
40 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
41 (false, consumed_epoch, consumed_rows, 0)
42 }
43 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
44 (true, 0, consumed_rows, 0)
45 } BackfillState::ConsumingLogStore { pending_epoch_lag } => {
47 (false, 0, 0, pending_epoch_lag)
48 }
49 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
50 };
51 PbCreateMviewProgress {
52 backfill_actor_id: actor_id,
53 done,
54 consumed_epoch,
55 consumed_rows,
56 pending_epoch_lag,
57 }
58 }
59}
60
61impl Display for BackfillState {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 match self {
64 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
65 write!(
66 f,
67 "ConsumingUpstreamTable(epoch: {}, rows: {})",
68 epoch, rows
69 )
70 }
71 BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
72 write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
73 }
74 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
75 write!(
76 f,
77 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
78 )
79 }
80 BackfillState::DoneConsumingLogStore => {
81 write!(f, "DoneConsumingLogStore")
82 }
83 }
84 }
85}
86
87impl DatabaseManagedBarrierState {
88 pub(crate) fn update_create_mview_progress(
89 &mut self,
90 epoch: EpochPair,
91 actor: ActorId,
92 state: BackfillState,
93 ) {
94 if let Some(actor_state) = self.actor_states.get(&actor)
95 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
96 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
97 {
98 graph_state
99 .create_mview_progress
100 .entry(epoch.curr)
101 .or_default()
102 .insert(actor, state);
103 } else {
104 warn!(?epoch, actor, ?state, "ignore create mview progress");
105 }
106 }
107
108 pub(crate) fn update_cdc_table_backfill_progress(
109 &mut self,
110 epoch: EpochPair,
111 actor: ActorId,
112 state: CdcTableBackfillState,
113 ) {
114 if let Some(actor_state) = self.actor_states.get(&actor)
115 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
116 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
117 {
118 graph_state
119 .cdc_table_backfill_progress
120 .entry(epoch.curr)
121 .or_default()
122 .insert(actor, state);
123 } else {
124 warn!(?epoch, actor, ?state, "ignore CDC table backfill progress");
125 }
126 }
127}
128
129impl LocalBarrierManager {
130 fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
131 self.send_event(ReportCreateProgress {
132 epoch,
133 actor,
134 state,
135 })
136 }
137}
138
139pub struct CreateMviewProgressReporter {
170 barrier_manager: LocalBarrierManager,
171
172 backfill_actor_id: ActorId,
174
175 state: Option<BackfillState>,
176}
177
178impl CreateMviewProgressReporter {
179 pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
180 Self {
181 barrier_manager,
182 backfill_actor_id,
183 state: None,
184 }
185 }
186
187 #[cfg(test)]
188 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
189 Self::new(barrier_manager, 0)
190 }
191
192 pub fn actor_id(&self) -> u32 {
193 self.backfill_actor_id
194 }
195
196 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
197 self.state = Some(state);
198 self.barrier_manager
199 .update_create_mview_progress(epoch, self.backfill_actor_id, state);
200 }
201
202 pub fn update(
207 &mut self,
208 epoch: EpochPair,
209 consumed_epoch: ConsumedEpoch,
210 current_consumed_rows: ConsumedRows,
211 ) {
212 match self.state {
213 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
214 assert!(
215 last <= consumed_epoch,
216 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
217 last,
218 consumed_epoch
219 );
220 assert!(last_consumed_rows <= current_consumed_rows);
221 }
222 Some(state) => {
223 panic!(
224 "should not update consuming progress at invalid state: {:?}",
225 state
226 )
227 }
228 None => {}
229 };
230 tracing::debug!(
231 actor_id = self.backfill_actor_id,
232 ?epoch,
233 consumed_epoch,
234 current_consumed_rows,
235 "progress update"
236 );
237 self.update_inner(
238 epoch,
239 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
240 );
241 }
242
243 pub fn update_for_source_backfill(
246 &mut self,
247 epoch: EpochPair,
248 current_consumed_rows: ConsumedRows,
249 ) {
250 match self.state {
251 Some(BackfillState::ConsumingUpstreamTableOrSource(
252 dummy_last_epoch,
253 _last_consumed_rows,
254 )) => {
255 debug_assert_eq!(dummy_last_epoch, 0);
256 }
257 Some(state) => {
258 panic!(
259 "should not update consuming progress at invalid state: {:?}",
260 state
261 )
262 }
263 None => {}
264 };
265 self.update_inner(
266 epoch,
267 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
269 );
270 }
271
272 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
275 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
276 return;
277 }
278 tracing::debug!(
279 actor_id = self.backfill_actor_id,
280 ?epoch,
281 current_consumed_rows,
282 "progress finish"
283 );
284 self.update_inner(
285 epoch,
286 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
287 );
288 }
289
290 pub(crate) fn update_create_mview_log_store_progress(
291 &mut self,
292 epoch: EpochPair,
293 pending_epoch_lag: u64,
294 ) {
295 assert_matches!(
296 self.state,
297 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
298 | Some(BackfillState::ConsumingLogStore { .. })
299 | None,
300 "cannot update log store progress at state {:?}",
301 self.state
302 );
303 self.update_inner(
304 epoch,
305 BackfillState::ConsumingLogStore { pending_epoch_lag },
306 );
307 }
308
309 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
310 assert_matches!(
311 self.state,
312 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
313 | Some(BackfillState::ConsumingLogStore { .. })
314 | None,
315 "cannot finish log store progress at state {:?}",
316 self.state
317 );
318 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
319 }
320}
321
322impl LocalBarrierManager {
323 pub(crate) fn register_create_mview_progress(
331 &self,
332 backfill_actor_id: ActorId,
333 ) -> CreateMviewProgressReporter {
334 trace!("register create mview progress: {}", backfill_actor_id);
335 CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
336 }
337}