risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::{ActorId, LocalBarrierManager};
24
25type ConsumedEpoch = u64;
26type ConsumedRows = u64;
27
28#[derive(Debug, Clone, Copy)]
29pub(crate) enum BackfillState {
30    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows),
31    DoneConsumingUpstreamTableOrSource(ConsumedRows),
32    ConsumingLogStore { pending_epoch_lag: u64 },
33    DoneConsumingLogStore,
34}
35
36impl BackfillState {
37    pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
38        let (done, consumed_epoch, consumed_rows, pending_epoch_lag) = match self {
39            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, consumed_rows) => {
40                (false, consumed_epoch, consumed_rows, 0)
41            }
42            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows) => {
43                (true, 0, consumed_rows, 0)
44            } // unused field for done
45            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
46                (false, 0, 0, pending_epoch_lag)
47            }
48            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0),
49        };
50        PbCreateMviewProgress {
51            backfill_actor_id: actor_id,
52            done,
53            consumed_epoch,
54            consumed_rows,
55            pending_epoch_lag,
56        }
57    }
58}
59
60impl Display for BackfillState {
61    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
62        match self {
63            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows) => {
64                write!(
65                    f,
66                    "ConsumingUpstreamTable(epoch: {}, rows: {})",
67                    epoch, rows
68                )
69            }
70            BackfillState::DoneConsumingUpstreamTableOrSource(rows) => {
71                write!(f, "DoneConsumingUpstreamTable(rows: {})", rows)
72            }
73            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
74                write!(
75                    f,
76                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
77                )
78            }
79            BackfillState::DoneConsumingLogStore => {
80                write!(f, "DoneConsumingLogStore")
81            }
82        }
83    }
84}
85
86impl DatabaseManagedBarrierState {
87    pub(crate) fn update_create_mview_progress(
88        &mut self,
89        epoch: EpochPair,
90        actor: ActorId,
91        state: BackfillState,
92    ) {
93        if let Some(actor_state) = self.actor_states.get(&actor)
94            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
95            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
96        {
97            graph_state
98                .create_mview_progress
99                .entry(epoch.curr)
100                .or_default()
101                .insert(actor, state);
102        } else {
103            warn!(?epoch, actor, ?state, "ignore create mview progress");
104        }
105    }
106}
107
108impl LocalBarrierManager {
109    fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
110        self.send_event(ReportCreateProgress {
111            epoch,
112            actor,
113            state,
114        })
115    }
116}
117
118/// The progress held by the backfill executors to report to the local barrier manager.
119///
120/// Progress can be computed by
121/// `total_rows_consumed` / `total_rows_upstream`.
122/// This yields the (approximate) percentage of rows we are done backfilling.
123///
124/// For `total_rows_consumed`, the progress is tracked in the following way:
125/// 1. Fetching the row count from our state table.
126///    This number is the total number, NOT incremental.
127///    This is done per actor.
128/// 2. Refreshing this number on the meta side, on every barrier.
129///    This is done by just summing up all the row counts from the actors.
130///
131/// For `total_rows_upstream`,
132/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
133///
134/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
135/// The `total_key_count` figure just counts the number of storage keys.
136/// For example, if a key is inserted and then deleted,
137/// it results two storage entries in `LSMt`, so count=2.
138/// Only after compaction, the count will drop back to 0.
139///
140/// So the total count could be more pessimistic, than actual progress.
141///
142/// It is fine for this number not to be precise,
143/// since we don't rely on it to update the status of a stream job internally.
144///
145/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
146/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
147/// `row_count` state for it.
148pub struct CreateMviewProgressReporter {
149    barrier_manager: LocalBarrierManager,
150
151    /// The id of the actor containing the backfill executors.
152    backfill_actor_id: ActorId,
153
154    state: Option<BackfillState>,
155}
156
157impl CreateMviewProgressReporter {
158    pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
159        Self {
160            barrier_manager,
161            backfill_actor_id,
162            state: None,
163        }
164    }
165
166    #[cfg(test)]
167    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
168        Self::new(barrier_manager, 0)
169    }
170
171    pub fn actor_id(&self) -> u32 {
172        self.backfill_actor_id
173    }
174
175    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
176        self.state = Some(state);
177        self.barrier_manager
178            .update_create_mview_progress(epoch, self.backfill_actor_id, state);
179    }
180
181    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
182    /// monotonically increasing.
183    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
184    /// `current_consumed_rows` is an accumulated value.
185    pub fn update(
186        &mut self,
187        epoch: EpochPair,
188        consumed_epoch: ConsumedEpoch,
189        current_consumed_rows: ConsumedRows,
190    ) {
191        match self.state {
192            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows)) => {
193                assert!(
194                    last <= consumed_epoch,
195                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
196                    last,
197                    consumed_epoch
198                );
199                assert!(last_consumed_rows <= current_consumed_rows);
200            }
201            Some(state) => {
202                panic!(
203                    "should not update consuming progress at invalid state: {:?}",
204                    state
205                )
206            }
207            None => {}
208        };
209        tracing::debug!(
210            actor_id = self.backfill_actor_id,
211            ?epoch,
212            consumed_epoch,
213            current_consumed_rows,
214            "progress update"
215        );
216        self.update_inner(
217            epoch,
218            BackfillState::ConsumingUpstreamTableOrSource(consumed_epoch, current_consumed_rows),
219        );
220    }
221
222    /// The difference from [`Self::update`] (MV backfill) is that we
223    /// don't care `ConsumedEpoch` here.
224    pub fn update_for_source_backfill(
225        &mut self,
226        epoch: EpochPair,
227        current_consumed_rows: ConsumedRows,
228    ) {
229        match self.state {
230            Some(BackfillState::ConsumingUpstreamTableOrSource(
231                dummy_last_epoch,
232                _last_consumed_rows,
233            )) => {
234                debug_assert_eq!(dummy_last_epoch, 0);
235            }
236            Some(state) => {
237                panic!(
238                    "should not update consuming progress at invalid state: {:?}",
239                    state
240                )
241            }
242            None => {}
243        };
244        self.update_inner(
245            epoch,
246            // fill a dummy ConsumedEpoch
247            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows),
248        );
249    }
250
251    /// Finish the progress. If the progress is already finished, then perform no-op.
252    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
253    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
254        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_)) = self.state {
255            return;
256        }
257        tracing::debug!(
258            actor_id = self.backfill_actor_id,
259            ?epoch,
260            current_consumed_rows,
261            "progress finish"
262        );
263        self.update_inner(
264            epoch,
265            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows),
266        );
267    }
268
269    pub(crate) fn update_create_mview_log_store_progress(
270        &mut self,
271        epoch: EpochPair,
272        pending_epoch_lag: u64,
273    ) {
274        assert_matches!(
275            self.state,
276            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
277                | Some(BackfillState::ConsumingLogStore { .. })
278                | None,
279            "cannot update log store progress at state {:?}",
280            self.state
281        );
282        self.update_inner(
283            epoch,
284            BackfillState::ConsumingLogStore { pending_epoch_lag },
285        );
286    }
287
288    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
289        assert_matches!(
290            self.state,
291            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_))
292                | Some(BackfillState::ConsumingLogStore { .. })
293                | None,
294            "cannot finish log store progress at state {:?}",
295            self.state
296        );
297        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
298    }
299}
300
301impl LocalBarrierManager {
302    /// Create a struct for reporting the progress of creating mview. The backfill executors should
303    /// report the progress of barrier rearranging continuously using this. The updated progress
304    /// will be collected by the local barrier manager and reported to the meta service in this
305    /// epoch.
306    ///
307    /// When all backfill executors of the creating mview finish, the creation progress will be done at
308    /// frontend and the mview will be exposed to the user.
309    pub(crate) fn register_create_mview_progress(
310        &self,
311        backfill_actor_id: ActorId,
312    ) -> CreateMviewProgressReporter {
313        trace!("register create mview progress: {}", backfill_actor_id);
314        CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
315    }
316}