risingwave_stream/task/barrier_manager/
progress.rs1use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::{ActorId, LocalBarrierManager};
24
25type ConsumedEpoch = u64;
26type ConsumedRows = u64;
27
28#[derive(Debug, Clone, Copy)]
29pub(crate) enum BackfillState {
30 ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
31 DoneConsumingUpstreamTableOrSource(ConsumedRows),
32 ConsumingLogStore { pending_epoch_lag: u64 },
33 DoneConsumingLogStore,
34}
35
36impl BackfillState {
37 pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
38 let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
39 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
40 (false, consumed_epoch, consumed_rows, 0)
41 }
42 BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
43 (true, 0, consumed_rows, 0)
44 } BackfillState::ConsumingLogStore { pending_epoch_lag } => {
46 (false, 0, 0, pending_epoch_lag)
47 }
48 BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
49 };
50 PbCreateMviewProgress {
51 backfill_actor_id: actor_id,
52 done,
53 consumed_epoch,
54 consumed_rows,
55 pending_epoch_lag,
56 }
57 }
58}
59
60impl Display for BackfillState {
61 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62 match self {
63 BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
64 write!(
65 f,
66 "ConsumingUpstreamTable(epoch: {}, rows: {})",
67 epoch, rows
68 )
69 }
70 BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
71 write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
72 }
73 BackfillState::ConsumingLogStore { pending_epoch_lag } => {
74 write!(
75 f,
76 "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
77 )
78 }
79 BackfillState::DoneConsumingLogStore => {
80 write!(f, "DoneConsumingLogStore")
81 }
82 }
83 }
84}
85
86impl DatabaseManagedBarrierState {
87 pub(crate) fn update_create_mview_progress(
88 &mut self,
89 epoch: EpochPair,
90 actor: ActorId,
91 state: BackfillState,
92 ) {
93 if let Some(actor_state) = self.actor_states.get(&actor)
94 && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
95 && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
96 {
97 graph_state
98 .create_mview_progress
99 .entry(epoch.curr)
100 .or_default()
101 .insert(actor, state);
102 } else {
103 warn!(?epoch, actor, ?state, "ignore create mview progress");
104 }
105 }
106}
107
108impl LocalBarrierManager {
109 fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
110 self.send_event(ReportCreateProgress {
111 epoch,
112 actor,
113 state,
114 })
115 }
116}
117
118pub struct CreateMviewProgressReporter {
149 barrier_manager: LocalBarrierManager,
150
151 backfill_actor_id: ActorId,
153
154 state: Option<BackfillState>,
155}
156
157impl CreateMviewProgressReporter {
158 pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
159 Self {
160 barrier_manager,
161 backfill_actor_id,
162 state: None,
163 }
164 }
165
166 #[cfg(test)]
167 pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
168 Self::new(barrier_manager, 0)
169 }
170
171 pub fn actor_id(&self) -> u32 {
172 self.backfill_actor_id
173 }
174
175 fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
176 self.state = Some(state);
177 self.barrier_manager
178 .update_create_mview_progress(epoch, self.backfill_actor_id, state);
179 }
180
181 pub fn update(
186 &mut self,
187 epoch: EpochPair,
188 consumed_epoch: ConsumedEpoch,
189 current_consumed_rows: ConsumedRows,
190 ) {
191 match self.state {
192 Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
193 assert!(
194 last <= consumed_epoch,
195 "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
196 last,
197 consumed_epoch
198 );
199 assert!(last_consumed_rows <= current_consumed_rows);
200 }
201 Some(state) => {
202 panic!(
203 "should not update consuming progress at invalid state: {:?}",
204 state
205 )
206 }
207 None => {}
208 };
209 tracing::debug!(
210 actor_id = self.backfill_actor_id,
211 ?epoch,
212 consumed_epoch,
213 current_consumed_rows,
214 "progress update"
215 );
216 self.update_inner(
217 epoch,
218 BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
219 );
220 }
221
222 pub fn update_for_source_backfill(
225 &mut self,
226 epoch: EpochPair,
227 current_consumed_rows: ConsumedRows,
228 ) {
229 match self.state {
230 Some(BackfillState::ConsumingUpstreamTableOrSource(
231 dummy_last_epoch,
232 _last_consumed_rows,
233 )) => {
234 debug_assert_eq!(dummy_last_epoch, 0);
235 }
236 Some(state) => {
237 panic!(
238 "should not update consuming progress at invalid state: {:?}",
239 state
240 )
241 }
242 None => {}
243 };
244 self.update_inner(
245 epoch,
246 BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
248 );
249 }
250
251 pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
254 if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
255 return;
256 }
257 tracing::debug!(
258 actor_id = self.backfill_actor_id,
259 ?epoch,
260 current_consumed_rows,
261 "progress finish"
262 );
263 self.update_inner(
264 epoch,
265 BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
266 );
267 }
268
269 pub(crate) fn update_create_mview_log_store_progress(
270 &mut self,
271 epoch: EpochPair,
272 pending_epoch_lag: u64,
273 ) {
274 assert_matches!(
275 self.state,
276 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
277 | Some(BackfillState::ConsumingLogStore { .. })
278 | None,
279 "cannot update log store progress at state {:?}",
280 self.state
281 );
282 self.update_inner(
283 epoch,
284 BackfillState::ConsumingLogStore { pending_epoch_lag },
285 );
286 }
287
288 pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
289 assert_matches!(
290 self.state,
291 Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
292 | Some(BackfillState::ConsumingLogStore { .. })
293 | None,
294 "cannot finish log store progress at state {:?}",
295 self.state
296 );
297 self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
298 }
299}
300
301impl LocalBarrierManager {
302 pub(crate) fn register_create_mview_progress(
310 &self,
311 backfill_actor_id: ActorId,
312 ) -> CreateMviewProgressReporter {
313 trace!("register create mview progress: {}", backfill_actor_id);
314 CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
315 }
316}