risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::cdc_progress::CdcTableBackfillState;
24use crate::task::{ActorId, LocalBarrierManager};
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28
29#[derive(Debug, Clone, Copy)]
30pub(crate) enum BackfillState {
31    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
32    DoneConsumingUpstreamTableOrSource(ConsumedRows),
33    ConsumingLogStore { pending_epoch_lag: u64 },
34    DoneConsumingLogStore,
35}
36
37impl BackfillState {
38    pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
39        let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
40            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
41                (false, consumed_epoch, consumed_rows, 0)
42            }
43            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
44                (true, 0, consumed_rows, 0)
45            } // unused field for done
46            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
47                (false, 0, 0, pending_epoch_lag)
48            }
49            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
50        };
51        PbCreateMviewProgress {
52            backfill_actor_id: actor_id,
53            done,
54            consumed_epoch,
55            consumed_rows,
56            pending_epoch_lag,
57        }
58    }
59}
60
61impl Display for BackfillState {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        match self {
64            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
65                write!(
66                    f,
67                    "ConsumingUpstreamTable(epoch: {}, rows: {})",
68                    epoch, rows
69                )
70            }
71            BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
72                write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
73            }
74            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
75                write!(
76                    f,
77                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
78                )
79            }
80            BackfillState::DoneConsumingLogStore => {
81                write!(f, "DoneConsumingLogStore")
82            }
83        }
84    }
85}
86
87impl DatabaseManagedBarrierState {
88    pub(crate) fn update_create_mview_progress(
89        &mut self,
90        epoch: EpochPair,
91        actor: ActorId,
92        state: BackfillState,
93    ) {
94        if let Some(actor_state) = self.actor_states.get(&actor)
95            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
96            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
97        {
98            graph_state
99                .create_mview_progress
100                .entry(epoch.curr)
101                .or_default()
102                .insert(actor, state);
103        } else {
104            warn!(?epoch, actor, ?state, "ignore create mview progress");
105        }
106    }
107
108    pub(crate) fn update_cdc_table_backfill_progress(
109        &mut self,
110        epoch: EpochPair,
111        actor: ActorId,
112        state: CdcTableBackfillState,
113    ) {
114        if let Some(actor_state) = self.actor_states.get(&actor)
115            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
116            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
117        {
118            graph_state
119                .cdc_table_backfill_progress
120                .entry(epoch.curr)
121                .or_default()
122                .insert(actor, state);
123        } else {
124            warn!(?epoch, actor, ?state, "ignore CDC table backfill progress");
125        }
126    }
127}
128
129impl LocalBarrierManager {
130    fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
131        self.send_event(ReportCreateProgress {
132            epoch,
133            actor,
134            state,
135        })
136    }
137}
138
139/// The progress held by the backfill executors to report to the local barrier manager.
140///
141/// Progress can be computed by
142/// `total_rows_consumed` / `total_rows_upstream`.
143/// This yields the (approximate) percentage of rows we are done backfilling.
144///
145/// For `total_rows_consumed`, the progress is tracked in the following way:
146/// 1. Fetching the row count from our state table.
147///    This number is the total number, NOT incremental.
148///    This is done per actor.
149/// 2. Refreshing this number on the meta side, on every barrier.
150///    This is done by just summing up all the row counts from the actors.
151///
152/// For `total_rows_upstream`,
153/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
154///
155/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
156/// The `total_key_count` figure just counts the number of storage keys.
157/// For example, if a key is inserted and then deleted,
158/// it results two storage entries in `LSMt`, so count=2.
159/// Only after compaction, the count will drop back to 0.
160///
161/// So the total count could be more pessimistic, than actual progress.
162///
163/// It is fine for this number not to be precise,
164/// since we don't rely on it to update the status of a stream job internally.
165///
166/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
167/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
168/// `row_count` state for it.
169pub struct CreateMviewProgressReporter {
170    barrier_manager: LocalBarrierManager,
171
172    /// The id of the actor containing the backfill executors.
173    backfill_actor_id: ActorId,
174
175    state: Option<BackfillState>,
176}
177
178impl CreateMviewProgressReporter {
179    pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
180        Self {
181            barrier_manager,
182            backfill_actor_id,
183            state: None,
184        }
185    }
186
187    #[cfg(test)]
188    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
189        Self::new(barrier_manager, 0)
190    }
191
192    pub fn actor_id(&self) -> u32 {
193        self.backfill_actor_id
194    }
195
196    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
197        self.state = Some(state);
198        self.barrier_manager
199            .update_create_mview_progress(epoch, self.backfill_actor_id, state);
200    }
201
202    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
203    /// monotonically increasing.
204    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
205    /// `current_consumed_rows` is an accumulated value.
206    pub fn update(
207        &mut self,
208        epoch: EpochPair,
209        consumed_epoch: ConsumedEpoch,
210        current_consumed_rows: ConsumedRows,
211    ) {
212        match self.state {
213            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
214                assert!(
215                    last <= consumed_epoch,
216                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
217                    last,
218                    consumed_epoch
219                );
220                assert!(last_consumed_rows <= current_consumed_rows);
221            }
222            Some(state) => {
223                panic!(
224                    "should not update consuming progress at invalid state: {:?}",
225                    state
226                )
227            }
228            None => {}
229        };
230        tracing::debug!(
231            actor_id = self.backfill_actor_id,
232            ?epoch,
233            consumed_epoch,
234            current_consumed_rows,
235            "progress update"
236        );
237        self.update_inner(
238            epoch,
239            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
240        );
241    }
242
243    /// The difference from [`Self::update`] (MV backfill) is that we
244    /// don't care `ConsumedEpoch` here.
245    pub fn update_for_source_backfill(
246        &mut self,
247        epoch: EpochPair,
248        current_consumed_rows: ConsumedRows,
249    ) {
250        match self.state {
251            Some(BackfillState::ConsumingUpstreamTableOrSource(
252                dummy_last_epoch,
253                _last_consumed_rows,
254            )) => {
255                debug_assert_eq!(dummy_last_epoch, 0);
256            }
257            Some(state) => {
258                panic!(
259                    "should not update consuming progress at invalid state: {:?}",
260                    state
261                )
262            }
263            None => {}
264        };
265        self.update_inner(
266            epoch,
267            // fill a dummy ConsumedEpoch
268            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
269        );
270    }
271
272    /// Finish the progress. If the progress is already finished, then perform no-op.
273    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
274    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
275        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
276            return;
277        }
278        tracing::debug!(
279            actor_id = self.backfill_actor_id,
280            ?epoch,
281            current_consumed_rows,
282            "progress finish"
283        );
284        self.update_inner(
285            epoch,
286            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
287        );
288    }
289
290    pub(crate) fn update_create_mview_log_store_progress(
291        &mut self,
292        epoch: EpochPair,
293        pending_epoch_lag: u64,
294    ) {
295        assert_matches!(
296            self.state,
297            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
298                | Some(BackfillState::ConsumingLogStore { .. })
299                | None,
300            "cannot update log store progress at state {:?}",
301            self.state
302        );
303        self.update_inner(
304            epoch,
305            BackfillState::ConsumingLogStore { pending_epoch_lag },
306        );
307    }
308
309    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
310        assert_matches!(
311            self.state,
312            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
313                | Some(BackfillState::ConsumingLogStore { .. })
314                | None,
315            "cannot finish log store progress at state {:?}",
316            self.state
317        );
318        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
319    }
320}
321
322impl LocalBarrierManager {
323    /// Create a struct for reporting the progress of creating mview. The backfill executors should
324    /// report the progress of barrier rearranging continuously using this. The updated progress
325    /// will be collected by the local barrier manager and reported to the meta service in this
326    /// epoch.
327    ///
328    /// When all backfill executors of the creating mview finish, the creation progress will be done at
329    /// frontend and the mview will be exposed to the user.
330    pub(crate) fn register_create_mview_progress(
331        &self,
332        backfill_actor_id: ActorId,
333    ) -> CreateMviewProgressReporter {
334        trace!("register create mview progress: {}", backfill_actor_id);
335        CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
336    }
337}