risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
20
21use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
22use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
23use crate::task::cdc_progress::CdcTableBackfillState;
24use crate::task::{ActorId, LocalBarrierManager};
25
26type ConsumedEpoch = u64;
27type ConsumedRows = u64;
28type BufferedRows = u64;
29
30#[derive(Debug, Clone, Copy)]
31pub(crate) enum BackfillState {
32    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
33    DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
34    ConsumingLogStore { pending_epoch_lag: u64 },
35    DoneConsumingLogStore,
36}
37
38impl BackfillState {
39    pub fn to_pb(self, actor_id: ActorId) -> PbCreateMviewProgress {
40        let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
41            BackfillState::ConsumingUpstreamTableOrSource(
42                consumed_epoch,
43                consumed_rows,
44                buffered_rows,
45            ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
46            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
47                (true, 0, consumed_rows, 0, buffered_rows)
48            }
49            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
50                (false, 0, 0, pending_epoch_lag, 0)
51            }
52            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
53        };
54        PbCreateMviewProgress {
55            backfill_actor_id: actor_id,
56            done,
57            consumed_epoch,
58            consumed_rows,
59            pending_epoch_lag,
60            buffered_rows,
61        }
62    }
63}
64
65impl Display for BackfillState {
66    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
67        match self {
68            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
69                write!(
70                    f,
71                    "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
72                    epoch, rows, buffered
73                )
74            }
75            BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
76                write!(
77                    f,
78                    "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
79                    rows, buffered
80                )
81            }
82            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
83                write!(
84                    f,
85                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
86                )
87            }
88            BackfillState::DoneConsumingLogStore => {
89                write!(f, "DoneConsumingLogStore")
90            }
91        }
92    }
93}
94
95impl DatabaseManagedBarrierState {
96    pub(crate) fn update_create_mview_progress(
97        &mut self,
98        epoch: EpochPair,
99        actor: ActorId,
100        state: BackfillState,
101    ) {
102        if let Some(actor_state) = self.actor_states.get(&actor)
103            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
104            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
105        {
106            graph_state
107                .create_mview_progress
108                .entry(epoch.curr)
109                .or_default()
110                .insert(actor, state);
111        } else {
112            warn!(?epoch, actor, ?state, "ignore create mview progress");
113        }
114    }
115
116    pub(crate) fn update_cdc_table_backfill_progress(
117        &mut self,
118        epoch: EpochPair,
119        actor: ActorId,
120        state: CdcTableBackfillState,
121    ) {
122        if let Some(actor_state) = self.actor_states.get(&actor)
123            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
124            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
125        {
126            graph_state
127                .cdc_table_backfill_progress
128                .entry(epoch.curr)
129                .or_default()
130                .insert(actor, state);
131        } else {
132            warn!(?epoch, actor, ?state, "ignore CDC table backfill progress");
133        }
134    }
135}
136
137impl LocalBarrierManager {
138    fn update_create_mview_progress(&self, epoch: EpochPair, actor: ActorId, state: BackfillState) {
139        self.send_event(ReportCreateProgress {
140            epoch,
141            actor,
142            state,
143        })
144    }
145}
146
147/// The progress held by the backfill executors to report to the local barrier manager.
148///
149/// Progress can be computed by
150/// `total_rows_consumed` / `total_rows_upstream`.
151/// This yields the (approximate) percentage of rows we are done backfilling.
152///
153/// For `total_rows_consumed`, the progress is tracked in the following way:
154/// 1. Fetching the row count from our state table.
155///    This number is the total number, NOT incremental.
156///    This is done per actor.
157/// 2. Refreshing this number on the meta side, on every barrier.
158///    This is done by just summing up all the row counts from the actors.
159///
160/// For `total_rows_upstream`,
161/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
162///
163/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
164/// The `total_key_count` figure just counts the number of storage keys.
165/// For example, if a key is inserted and then deleted,
166/// it results two storage entries in `LSMt`, so count=2.
167/// Only after compaction, the count will drop back to 0.
168///
169/// So the total count could be more pessimistic, than actual progress.
170///
171/// It is fine for this number not to be precise,
172/// since we don't rely on it to update the status of a stream job internally.
173///
174/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
175/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
176/// `row_count` state for it.
177pub struct CreateMviewProgressReporter {
178    barrier_manager: LocalBarrierManager,
179
180    /// The id of the actor containing the backfill executors.
181    backfill_actor_id: ActorId,
182
183    state: Option<BackfillState>,
184}
185
186impl CreateMviewProgressReporter {
187    pub fn new(barrier_manager: LocalBarrierManager, backfill_actor_id: ActorId) -> Self {
188        Self {
189            barrier_manager,
190            backfill_actor_id,
191            state: None,
192        }
193    }
194
195    #[cfg(test)]
196    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
197        Self::new(barrier_manager, 0)
198    }
199
200    pub fn actor_id(&self) -> u32 {
201        self.backfill_actor_id
202    }
203
204    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
205        self.state = Some(state);
206        self.barrier_manager
207            .update_create_mview_progress(epoch, self.backfill_actor_id, state);
208    }
209
210    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
211    /// monotonically increasing.
212    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
213    /// `current_consumed_rows` is an accumulated value.
214    pub fn update(
215        &mut self,
216        epoch: EpochPair,
217        consumed_epoch: ConsumedEpoch,
218        current_consumed_rows: ConsumedRows,
219    ) {
220        self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
221    }
222
223    /// Update the progress with buffered rows information.
224    /// This is used by locality backfill to report precise progress including buffered data.
225    pub fn update_with_buffered_rows(
226        &mut self,
227        epoch: EpochPair,
228        consumed_epoch: ConsumedEpoch,
229        current_consumed_rows: ConsumedRows,
230        buffered_rows: BufferedRows,
231    ) {
232        match self.state {
233            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
234                assert!(
235                    last <= consumed_epoch,
236                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
237                    last,
238                    consumed_epoch
239                );
240                assert!(last_consumed_rows <= current_consumed_rows);
241            }
242            Some(state) => {
243                panic!(
244                    "should not update consuming progress at invalid state: {:?}",
245                    state
246                )
247            }
248            None => {}
249        };
250        tracing::debug!(
251            actor_id = self.backfill_actor_id,
252            ?epoch,
253            consumed_epoch,
254            current_consumed_rows,
255            buffered_rows,
256            "progress update"
257        );
258        self.update_inner(
259            epoch,
260            BackfillState::ConsumingUpstreamTableOrSource(
261                consumed_epoch,
262                current_consumed_rows,
263                buffered_rows,
264            ),
265        );
266    }
267
268    /// The difference from [`Self::update`] (MV backfill) is that we
269    /// don't care `ConsumedEpoch` here.
270    pub fn update_for_source_backfill(
271        &mut self,
272        epoch: EpochPair,
273        current_consumed_rows: ConsumedRows,
274    ) {
275        match self.state {
276            Some(BackfillState::ConsumingUpstreamTableOrSource(
277                dummy_last_epoch,
278                _last_consumed_rows,
279                _,
280            )) => {
281                debug_assert_eq!(dummy_last_epoch, 0);
282            }
283            Some(state) => {
284                panic!(
285                    "should not update consuming progress at invalid state: {:?}",
286                    state
287                )
288            }
289            None => {}
290        };
291        self.update_inner(
292            epoch,
293            // fill a dummy ConsumedEpoch and no buffered rows
294            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
295        );
296    }
297
298    /// Finish the progress. If the progress is already finished, then perform no-op.
299    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
300    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
301        self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
302    }
303
304    /// Finish the progress with buffered rows information.
305    /// This is used by locality backfill to report any remaining buffered rows at completion.
306    pub fn finish_with_buffered_rows(
307        &mut self,
308        epoch: EpochPair,
309        current_consumed_rows: ConsumedRows,
310        buffered_rows: BufferedRows,
311    ) {
312        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
313            return;
314        }
315        tracing::debug!(
316            actor_id = self.backfill_actor_id,
317            ?epoch,
318            current_consumed_rows,
319            buffered_rows,
320            "progress finish"
321        );
322        // When finishing, report the consumed rows and buffered rows separately
323        self.update_inner(
324            epoch,
325            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
326        );
327    }
328
329    pub(crate) fn update_create_mview_log_store_progress(
330        &mut self,
331        epoch: EpochPair,
332        pending_epoch_lag: u64,
333    ) {
334        assert_matches!(
335            self.state,
336            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
337                | Some(BackfillState::ConsumingLogStore { .. })
338                | None,
339            "cannot update log store progress at state {:?}",
340            self.state
341        );
342        self.update_inner(
343            epoch,
344            BackfillState::ConsumingLogStore { pending_epoch_lag },
345        );
346    }
347
348    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
349        assert_matches!(
350            self.state,
351            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
352                | Some(BackfillState::ConsumingLogStore { .. })
353                | None,
354            "cannot finish log store progress at state {:?}",
355            self.state
356        );
357        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
358    }
359}
360
361impl LocalBarrierManager {
362    /// Create a struct for reporting the progress of creating mview. The backfill executors should
363    /// report the progress of barrier rearranging continuously using this. The updated progress
364    /// will be collected by the local barrier manager and reported to the meta service in this
365    /// epoch.
366    ///
367    /// When all backfill executors of the creating mview finish, the creation progress will be done at
368    /// frontend and the mview will be exposed to the user.
369    pub(crate) fn register_create_mview_progress(
370        &self,
371        backfill_actor_id: ActorId,
372    ) -> CreateMviewProgressReporter {
373        trace!("register create mview progress: {}", backfill_actor_id);
374        CreateMviewProgressReporter::new(self.clone(), backfill_actor_id)
375    }
376}