risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use super::LocalBarrierManager;
22use crate::task::ActorId;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_manager::managed_state::DatabaseManagedBarrierState;
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28
29#[derive(Debug, Clone, Copy)]
30pub(crate) enum BackfillState {
31    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
32    DoneConsumingUpstreamTableOrSource(ConsumedRows),
33    ConsumingLogStore { pending_epoch_lag: u64 },
34    DoneConsumingLogStore,
35}
36
37impl BackfillState {
38    pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
39        let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
40            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
41                (false, consumed_epoch, consumed_rows, 0)
42            }
43            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
44                (true, 0, consumed_rows, 0)
45            } // unused field for done
46            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
47                (false, 0, 0, pending_epoch_lag)
48            }
49            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
50        };
51        PbCreateMviewProgress {
52            backfill_actor_id: actor_id,
53            done,
54            consumed_epoch,
55            consumed_rows,
56            pending_epoch_lag,
57        }
58    }
59}
60
61impl Display for BackfillState {
62    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
63        match self {
64            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
65                write!(
66                    f,
67                    "ConsumingUpstreamTable(epoch: {}, rows: {})",
68                    epoch, rows
69                )
70            }
71            BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
72                write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
73            }
74            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
75                write!(
76                    f,
77                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
78                )
79            }
80            BackfillState::DoneConsumingLogStore => {
81                write!(f, "DoneConsumingLogStore")
82            }
83        }
84    }
85}
86
87impl DatabaseManagedBarrierState {
88    pub(crate) fn update_create_mview_progress(
89        &mut self,
90        epoch: EpochPair,
91        actor: ActorId,
92        state: BackfillState,
93    ) {
94        if let Some(actor_state) = self.actor_states.get(&actor)
95            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
96            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
97        {
98            graph_state
99                .create_mview_progress
100                .entry(epoch.curr)
101                .or_default()
102                .insert(actor, state);
103        } else {
104            warn!(?epoch, actor, ?state, "ignore create mview progress");
105        }
106    }
107}
108
109impl LocalBarrierManager {
110    fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
111        self.send_event(ReportCreateProgress {
112            epoch,
113            actor,
114            state,
115        })
116    }
117}
118
119/// The progress held by the backfill executors to report to the local barrier manager.
120///
121/// Progress can be computed by
122/// `total_rows_consumed` / `total_rows_upstream`.
123/// This yields the (approximate) percentage of rows we are done backfilling.
124///
125/// For `total_rows_consumed`, the progress is tracked in the following way:
126/// 1. Fetching the row count from our state table.
127///    This number is the total number, NOT incremental.
128///    This is done per actor.
129/// 2. Refreshing this number on the meta side, on every barrier.
130///    This is done by just summing up all the row counts from the actors.
131///
132/// For `total_rows_upstream`,
133/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
134///
135/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
136/// The `total_key_count` figure just counts the number of storage keys.
137/// For example, if a key is inserted and then deleted,
138/// it results two storage entries in `LSMt`, so count=2.
139/// Only after compaction, the count will drop back to 0.
140///
141/// So the total count could be more pessimistic, than actual progress.
142///
143/// It is fine for this number not to be precise,
144/// since we don't rely on it to update the status of a stream job internally.
145///
146/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
147/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
148/// `row_count` state for it.
149pub struct CreateMviewProgressReporter {
150    barrier_manager: LocalBarrierManager,
151
152    /// The id of the actor containing the backfill executors.
153    backfill_actor_id: ActorId,
154
155    state: Option<BackfillState>,
156}
157
158impl CreateMviewProgressReporter {
159    pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
160        Self {
161            barrier_manager,
162            backfill_actor_id,
163            state: None,
164        }
165    }
166
167    #[cfg(test)]
168    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
169        Self::new(barrier_manager, 0)
170    }
171
172    pub fn actor_id(&self) -> u32 {
173        self.backfill_actor_id
174    }
175
176    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
177        self.state = Some(state);
178        self.barrier_manager
179            .update_create_mview_progress(epoch, self.backfill_actor_id, state);
180    }
181
182    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
183    /// monotonically increasing.
184    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
185    /// `current_consumed_rows` is an accumulated value.
186    pub fn update(
187        &mut self,
188        epoch: EpochPair,
189        consumed_epoch: ConsumedEpoch,
190        current_consumed_rows: ConsumedRows,
191    ) {
192        match self.state {
193            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
194                assert!(
195                    last <= consumed_epoch,
196                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
197                    last,
198                    consumed_epoch
199                );
200                assert!(last_consumed_rows <= current_consumed_rows);
201            }
202            Some(state) => {
203                panic!(
204                    "should not update consuming progress at invalid state: {:?}",
205                    state
206                )
207            }
208            None => {}
209        };
210        tracing::debug!(
211            actor_id = self.backfill_actor_id,
212            ?epoch,
213            consumed_epoch,
214            current_consumed_rows,
215            "progress update"
216        );
217        self.update_inner(
218            epoch,
219            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
220        );
221    }
222
223    /// The difference from [`Self::update`] (MV backfill) is that we
224    /// don't care `ConsumedEpoch` here.
225    pub fn update_for_source_backfill(
226        &mut self,
227        epoch: EpochPair,
228        current_consumed_rows: ConsumedRows,
229    ) {
230        match self.state {
231            Some(BackfillState::ConsumingUpstreamTableOrSource(
232                dummy_last_epoch,
233                _last_consumed_rows,
234            )) => {
235                debug_assert_eq!(dummy_last_epoch, 0);
236            }
237            Some(state) => {
238                panic!(
239                    "should not update consuming progress at invalid state: {:?}",
240                    state
241                )
242            }
243            None => {}
244        };
245        self.update_inner(
246            epoch,
247            // fill a dummy ConsumedEpoch
248            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
249        );
250    }
251
252    /// Finish the progress. If the progress is already finished, then perform no-op.
253    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
254    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
255        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
256            return;
257        }
258        tracing::debug!(
259            actor_id = self.backfill_actor_id,
260            ?epoch,
261            current_consumed_rows,
262            "progress finish"
263        );
264        self.update_inner(
265            epoch,
266            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
267        );
268    }
269
270    pub(crate) fn update_create_mview_log_store_progress(
271        &mut self,
272        epoch: EpochPair,
273        pending_epoch_lag: u64,
274    ) {
275        assert_matches!(
276            self.state,
277            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
278                | Some(BackfillState::ConsumingLogStore { .. })
279                | None,
280            "cannot update log store progress at state {:?}",
281            self.state
282        );
283        self.update_inner(
284            epoch,
285            BackfillState::ConsumingLogStore { pending_epoch_lag },
286        );
287    }
288
289    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
290        assert_matches!(
291            self.state,
292            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
293                | Some(BackfillState::ConsumingLogStore { .. })
294                | None,
295            "cannot finish log store progress at state {:?}",
296            self.state
297        );
298        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
299    }
300}
301
302impl LocalBarrierManager {
303    /// Create a struct for reporting the progress of creating mview. The backfill executors should
304    /// report the progress of barrier rearranging continuously using this. The updated progress
305    /// will be collected by the local barrier manager and reported to the meta service in this
306    /// epoch.
307    ///
308    /// When all backfill executors of the creating mview finish, the creation progress will be done at
309    /// frontend and the mview will be exposed to the user.
310    pub(crate) fn register_create_mview_progress(
311        &self,
312        backfill_actor_id: ActorId,
313    ) -> CreateMviewProgressReporter {
314        trace!("register create mview progress: {}", backfill_actor_id);
315        CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
316    }
317}