risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35    DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36    ConsumingLogStore { pending_epoch_lag: u64 },
37    DoneConsumingLogStore,
38}
39
40impl BackfillState {
41    pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42        let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43            BackfillState::ConsumingUpstreamTableOrSource(
44                consumed_epoch,
45                consumed_rows,
46                buffered_rows,
47            ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49                (true, 0, consumed_rows, 0, buffered_rows)
50            }
51            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52                (false, 0, 0, pending_epoch_lag, 0)
53            }
54            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55        };
56        PbCreateMviewProgress {
57            backfill_actor_id: actor_id,
58            done,
59            consumed_epoch,
60            consumed_rows,
61            pending_epoch_lag,
62            buffered_rows,
63            fragment_id,
64        }
65    }
66}
67
68impl Display for BackfillState {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        match self {
71            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72                write!(
73                    f,
74                    "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75                    epoch, rows, buffered
76                )
77            }
78            BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79                write!(
80                    f,
81                    "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82                    rows, buffered
83                )
84            }
85            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86                write!(
87                    f,
88                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89                )
90            }
91            BackfillState::DoneConsumingLogStore => {
92                write!(f, "DoneConsumingLogStore")
93            }
94        }
95    }
96}
97
98impl DatabaseManagedBarrierState {
99    pub(crate) fn update_create_mview_progress(
100        &mut self,
101        epoch: EpochPair,
102        fragment_id: FragmentId,
103        actor: ActorId,
104        state: BackfillState,
105    ) {
106        if let Some(actor_state) = self.actor_states.get(&actor)
107            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
108            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
109        {
110            if let Some((prev_fragment_id, _)) = graph_state
111                .create_mview_progress
112                .entry(epoch.curr)
113                .or_default()
114                .insert(actor, (fragment_id, state))
115            {
116                assert_eq!(prev_fragment_id, fragment_id)
117            }
118        } else {
119            warn!(?epoch, %actor, ?state, "ignore create mview progress");
120        }
121    }
122
123    pub(crate) fn update_cdc_table_backfill_progress(
124        &mut self,
125        epoch: EpochPair,
126        actor: ActorId,
127        state: CdcTableBackfillState,
128    ) {
129        if let Some(actor_state) = self.actor_states.get(&actor)
130            && let Some(partial_graph_id) = actor_state.inflight_barriers.get(&epoch.prev)
131            && let Some(graph_state) = self.graph_states.get_mut(partial_graph_id)
132        {
133            graph_state
134                .cdc_table_backfill_progress
135                .entry(epoch.curr)
136                .or_default()
137                .insert(actor, state);
138        } else {
139            warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
140        }
141    }
142}
143
144impl LocalBarrierManager {
145    fn update_create_mview_progress(
146        &self,
147        epoch: EpochPair,
148        fragment_id: FragmentId,
149        actor: ActorId,
150        state: BackfillState,
151    ) {
152        self.send_event(ReportCreateProgress {
153            epoch,
154            fragment_id,
155            actor,
156            state,
157        })
158    }
159}
160
161/// The progress held by the backfill executors to report to the local barrier manager.
162///
163/// Progress can be computed by
164/// `total_rows_consumed` / `total_rows_upstream`.
165/// This yields the (approximate) percentage of rows we are done backfilling.
166///
167/// For `total_rows_consumed`, the progress is tracked in the following way:
168/// 1. Fetching the row count from our state table.
169///    This number is the total number, NOT incremental.
170///    This is done per actor.
171/// 2. Refreshing this number on the meta side, on every barrier.
172///    This is done by just summing up all the row counts from the actors.
173///
174/// For `total_rows_upstream`,
175/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
176///
177/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
178/// The `total_key_count` figure just counts the number of storage keys.
179/// For example, if a key is inserted and then deleted,
180/// it results two storage entries in `LSMt`, so count=2.
181/// Only after compaction, the count will drop back to 0.
182///
183/// So the total count could be more pessimistic, than actual progress.
184///
185/// It is fine for this number not to be precise,
186/// since we don't rely on it to update the status of a stream job internally.
187///
188/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
189/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
190/// `row_count` state for it.
191pub struct CreateMviewProgressReporter {
192    barrier_manager: LocalBarrierManager,
193
194    fragment_id: FragmentId,
195
196    /// The id of the actor containing the backfill executors.
197    backfill_actor_id: ActorId,
198
199    state: Option<BackfillState>,
200}
201
202impl CreateMviewProgressReporter {
203    pub fn new(
204        barrier_manager: LocalBarrierManager,
205        fragment_id: FragmentId,
206        backfill_actor_id: ActorId,
207    ) -> Self {
208        Self {
209            barrier_manager,
210            fragment_id,
211            backfill_actor_id,
212            state: None,
213        }
214    }
215
216    #[cfg(test)]
217    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
218        Self::new(barrier_manager, 0.into(), 0.into())
219    }
220
221    pub fn actor_id(&self) -> ActorId {
222        self.backfill_actor_id
223    }
224
225    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
226        self.state = Some(state);
227        self.barrier_manager.update_create_mview_progress(
228            epoch,
229            self.fragment_id,
230            self.backfill_actor_id,
231            state,
232        );
233    }
234
235    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
236    /// monotonically increasing.
237    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
238    /// `current_consumed_rows` is an accumulated value.
239    pub fn update(
240        &mut self,
241        epoch: EpochPair,
242        consumed_epoch: ConsumedEpoch,
243        current_consumed_rows: ConsumedRows,
244    ) {
245        self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
246    }
247
248    /// Update the progress with buffered rows information.
249    /// This is used by locality backfill to report precise progress including buffered data.
250    pub fn update_with_buffered_rows(
251        &mut self,
252        epoch: EpochPair,
253        consumed_epoch: ConsumedEpoch,
254        current_consumed_rows: ConsumedRows,
255        buffered_rows: BufferedRows,
256    ) {
257        match self.state {
258            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
259                assert!(
260                    last <= consumed_epoch,
261                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
262                    last,
263                    consumed_epoch
264                );
265                assert!(last_consumed_rows <= current_consumed_rows);
266            }
267            Some(state) => {
268                panic!(
269                    "should not update consuming progress at invalid state: {:?}",
270                    state
271                )
272            }
273            None => {}
274        };
275        tracing::debug!(
276            actor_id = %self.backfill_actor_id,
277            ?epoch,
278            consumed_epoch,
279            current_consumed_rows,
280            buffered_rows,
281            "progress update"
282        );
283        self.update_inner(
284            epoch,
285            BackfillState::ConsumingUpstreamTableOrSource(
286                consumed_epoch,
287                current_consumed_rows,
288                buffered_rows,
289            ),
290        );
291    }
292
293    /// The difference from [`Self::update`] (MV backfill) is that we
294    /// don't care `ConsumedEpoch` here.
295    pub fn update_for_source_backfill(
296        &mut self,
297        epoch: EpochPair,
298        current_consumed_rows: ConsumedRows,
299    ) {
300        match self.state {
301            Some(BackfillState::ConsumingUpstreamTableOrSource(
302                dummy_last_epoch,
303                _last_consumed_rows,
304                _,
305            )) => {
306                debug_assert_eq!(dummy_last_epoch, 0);
307            }
308            Some(state) => {
309                panic!(
310                    "should not update consuming progress at invalid state: {:?}",
311                    state
312                )
313            }
314            None => {}
315        };
316        self.update_inner(
317            epoch,
318            // fill a dummy ConsumedEpoch and no buffered rows
319            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
320        );
321    }
322
323    /// Finish the progress. If the progress is already finished, then perform no-op.
324    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
325    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
326        self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
327    }
328
329    /// Finish the progress with buffered rows information.
330    /// This is used by locality backfill to report any remaining buffered rows at completion.
331    pub fn finish_with_buffered_rows(
332        &mut self,
333        epoch: EpochPair,
334        current_consumed_rows: ConsumedRows,
335        buffered_rows: BufferedRows,
336    ) {
337        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
338            return;
339        }
340        tracing::debug!(
341            actor_id = %self.backfill_actor_id,
342            ?epoch,
343            current_consumed_rows,
344            buffered_rows,
345            "progress finish"
346        );
347        // When finishing, report the consumed rows and buffered rows separately
348        self.update_inner(
349            epoch,
350            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
351        );
352    }
353
354    pub(crate) fn update_create_mview_log_store_progress(
355        &mut self,
356        epoch: EpochPair,
357        pending_epoch_lag: u64,
358    ) {
359        assert_matches!(
360            self.state,
361            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
362                | Some(BackfillState::ConsumingLogStore { .. })
363                | None,
364            "cannot update log store progress at state {:?}",
365            self.state
366        );
367        self.update_inner(
368            epoch,
369            BackfillState::ConsumingLogStore { pending_epoch_lag },
370        );
371    }
372
373    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
374        assert_matches!(
375            self.state,
376            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
377                | Some(BackfillState::ConsumingLogStore { .. })
378                | None,
379            "cannot finish log store progress at state {:?}",
380            self.state
381        );
382        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
383    }
384}
385
386impl LocalBarrierManager {
387    /// Create a struct for reporting the progress of creating mview. The backfill executors should
388    /// report the progress of barrier rearranging continuously using this. The updated progress
389    /// will be collected by the local barrier manager and reported to the meta service in this
390    /// epoch.
391    ///
392    /// When all backfill executors of the creating mview finish, the creation progress will be done at
393    /// frontend and the mview will be exposed to the user.
394    pub(crate) fn register_create_mview_progress(
395        &self,
396        actor_ctx: &ActorContext,
397    ) -> CreateMviewProgressReporter {
398        let fragment_id = actor_ctx.fragment_id;
399        let backfill_actor_id = actor_ctx.id;
400        trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
401        CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
402    }
403}