risingwave_stream/task/barrier_manager/
progress.rs1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use super::LocalBarrierManager;
22use crate::task::ActorId;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_manager::managed_state::DatabaseManagedBarrierState;
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28
29#[derive(Debug, Clone, Copy)]
30pub(crate) enum BackfillState {
31 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
32 DoneConsumingUpstreamTableOrSource(ConsumedRows),
33 ConsumingLogStore { pending_epoch_lag: u64 },
34 DoneConsumingLogStore,
35}
36
37impl BackfillState {
38 pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
39 let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
40 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
41 (false, consumed_epoch, consumed_rows, 0)
42 }
43 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
44 (true, 0, consumed_rows, 0)
45 } BackfillState::ConsumingLogStore { pending_epoch_lag } => {
47 (false, 0, 0, pending_epoch_lag)
48 }
49 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
50 };
51 PbCreateMviewProgress {
52 backfill_actor_id: actor_id,
53 done,
54 consumed_epoch,
55 consumed_rows,
56 pending_epoch_lag,
57 }
58 }
59}
60
61impl Display for BackfillState {
62 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63 match self {
64 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
65 write!(
66 f,
67 "ConsumingUpstreamTable(epoch: {}, rows: {})",
68 epoch, rows
69 )
70 }
71 BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
72 write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
73 }
74 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
75 write!(
76 f,
77 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
78 )
79 }
80 BackfillState::DoneConsumingLogStore => {
81 write!(f, "DoneConsumingLogStore")
82 }
83 }
84 }
85}
86
87impl DatabaseManagedBarrierState {
88 pub(crate) fn update_create_mview_progress(
89 &mut self,
90 epoch: EpochPair,
91 actor: ActorId,
92 state: BackfillState,
93 ) {
94 if let Some(actor_state) = self.actor_states.get(&actor)
95 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
96 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
97 {
98 graph_state
99 .create_mview_progress
100 .entry(epoch.curr)
101 .or_default()
102 .insert(actor, state);
103 } else {
104 warn!(?epoch, actor, ?state, "ignore create mview progress");
105 }
106 }
107}
108
109impl LocalBarrierManager {
110 fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
111 self.send_event(ReportCreateProgress {
112 epoch,
113 actor,
114 state,
115 })
116 }
117}
118
119pub struct CreateMviewProgressReporter {
150 barrier_manager: LocalBarrierManager,
151
152 backfill_actor_id: ActorId,
154
155 state: Option<BackfillState>,
156}
157
158impl CreateMviewProgressReporter {
159 pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
160 Self {
161 barrier_manager,
162 backfill_actor_id,
163 state: None,
164 }
165 }
166
167 #[cfg(test)]
168 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
169 Self::new(barrier_manager, 0)
170 }
171
172 pub fn actor_id(&self) -> u32 {
173 self.backfill_actor_id
174 }
175
176 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
177 self.state = Some(state);
178 self.barrier_manager
179 .update_create_mview_progress(epoch, self.backfill_actor_id, state);
180 }
181
182 pub fn update(
187 &mut self,
188 epoch: EpochPair,
189 consumed_epoch: ConsumedEpoch,
190 current_consumed_rows: ConsumedRows,
191 ) {
192 match self.state {
193 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
194 assert!(
195 last <= consumed_epoch,
196 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
197 last,
198 consumed_epoch
199 );
200 assert!(last_consumed_rows <= current_consumed_rows);
201 }
202 Some(state) => {
203 panic!(
204 "should not update consuming progress at invalid state: {:?}",
205 state
206 )
207 }
208 None => {}
209 };
210 tracing::debug!(
211 actor_id = self.backfill_actor_id,
212 ?epoch,
213 consumed_epoch,
214 current_consumed_rows,
215 "progress update"
216 );
217 self.update_inner(
218 epoch,
219 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
220 );
221 }
222
223 pub fn update_for_source_backfill(
226 &mut self,
227 epoch: EpochPair,
228 current_consumed_rows: ConsumedRows,
229 ) {
230 match self.state {
231 Some(BackfillState::ConsumingUpstreamTableOrSource(
232 dummy_last_epoch,
233 _last_consumed_rows,
234 )) => {
235 debug_assert_eq!(dummy_last_epoch, 0);
236 }
237 Some(state) => {
238 panic!(
239 "should not update consuming progress at invalid state: {:?}",
240 state
241 )
242 }
243 None => {}
244 };
245 self.update_inner(
246 epoch,
247 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
249 );
250 }
251
252 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
255 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
256 return;
257 }
258 tracing::debug!(
259 actor_id = self.backfill_actor_id,
260 ?epoch,
261 current_consumed_rows,
262 "progress finish"
263 );
264 self.update_inner(
265 epoch,
266 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
267 );
268 }
269
270 pub(crate) fn update_create_mview_log_store_progress(
271 &mut self,
272 epoch: EpochPair,
273 pending_epoch_lag: u64,
274 ) {
275 assert_matches!(
276 self.state,
277 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
278 | Some(BackfillState::ConsumingLogStore { .. })
279 | None,
280 "cannot update log store progress at state {:?}",
281 self.state
282 );
283 self.update_inner(
284 epoch,
285 BackfillState::ConsumingLogStore { pending_epoch_lag },
286 );
287 }
288
289 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
290 assert_matches!(
291 self.state,
292 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
293 | Some(BackfillState::ConsumingLogStore { .. })
294 | None,
295 "cannot finish log store progress at state {:?}",
296 self.state
297 );
298 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
299 }
300}
301
302impl LocalBarrierManager {
303 pub(crate) fn register_create_mview_progress(
311 &self,
312 backfill_actor_id: ActorId,
313 ) -> CreateMviewProgressReporter {
314 trace!("register create mview progress: {}", backfill_actor_id);
315 CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
316 }
317}