risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::DatabaseManagedBarrierState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35    DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36    ConsumingLogStore { pending_epoch_lag: u64 },
37    DoneConsumingLogStore,
38}
39
40impl BackfillState {
41    pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42        let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43            BackfillState::ConsumingUpstreamTableOrSource(
44                consumed_epoch,
45                consumed_rows,
46                buffered_rows,
47            ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49                (true, 0, consumed_rows, 0, buffered_rows)
50            }
51            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52                (false, 0, 0, pending_epoch_lag, 0)
53            }
54            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55        };
56        PbCreateMviewProgress {
57            backfill_actor_id: actor_id,
58            done,
59            consumed_epoch,
60            consumed_rows,
61            pending_epoch_lag,
62            buffered_rows,
63            fragment_id,
64        }
65    }
66}
67
68impl Display for BackfillState {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        match self {
71            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72                write!(
73                    f,
74                    "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75                    epoch, rows, buffered
76                )
77            }
78            BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79                write!(
80                    f,
81                    "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82                    rows, buffered
83                )
84            }
85            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86                write!(
87                    f,
88                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89                )
90            }
91            BackfillState::DoneConsumingLogStore => {
92                write!(f, "DoneConsumingLogStore")
93            }
94        }
95    }
96}
97
98impl DatabaseManagedBarrierState {
99    pub(crate) fn update_create_mview_progress(
100        &mut self,
101        epoch: EpochPair,
102        fragment_id: FragmentId,
103        actor: ActorId,
104        state: BackfillState,
105    ) {
106        if let Some(actor_state) = self.actor_states.get(&actor)
107            && let Some(graph_state) = self.graph_states.get_mut(&actor_state.partial_graph_id)
108        {
109            if let Some((prev_fragment_id, _)) = graph_state
110                .create_mview_progress
111                .entry(epoch.curr)
112                .or_default()
113                .insert(actor, (fragment_id, state))
114            {
115                assert_eq!(prev_fragment_id, fragment_id)
116            }
117        } else {
118            warn!(?epoch, %actor, ?state, "ignore create mview progress");
119        }
120    }
121
122    pub(crate) fn update_cdc_table_backfill_progress(
123        &mut self,
124        epoch: EpochPair,
125        actor: ActorId,
126        state: CdcTableBackfillState,
127    ) {
128        if let Some(actor_state) = self.actor_states.get(&actor)
129            && let Some(graph_state) = self.graph_states.get_mut(&actor_state.partial_graph_id)
130        {
131            graph_state
132                .cdc_table_backfill_progress
133                .entry(epoch.curr)
134                .or_default()
135                .insert(actor, state);
136        } else {
137            warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
138        }
139    }
140}
141
142impl LocalBarrierManager {
143    fn update_create_mview_progress(
144        &self,
145        epoch: EpochPair,
146        fragment_id: FragmentId,
147        actor: ActorId,
148        state: BackfillState,
149    ) {
150        self.send_event(ReportCreateProgress {
151            epoch,
152            fragment_id,
153            actor,
154            state,
155        })
156    }
157}
158
159/// The progress held by the backfill executors to report to the local barrier manager.
160///
161/// Progress can be computed by
162/// `total_rows_consumed` / `total_rows_upstream`.
163/// This yields the (approximate) percentage of rows we are done backfilling.
164///
165/// For `total_rows_consumed`, the progress is tracked in the following way:
166/// 1. Fetching the row count from our state table.
167///    This number is the total number, NOT incremental.
168///    This is done per actor.
169/// 2. Refreshing this number on the meta side, on every barrier.
170///    This is done by just summing up all the row counts from the actors.
171///
172/// For `total_rows_upstream`,
173/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
174///
175/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
176/// The `total_key_count` figure just counts the number of storage keys.
177/// For example, if a key is inserted and then deleted,
178/// it results two storage entries in `LSMt`, so count=2.
179/// Only after compaction, the count will drop back to 0.
180///
181/// So the total count could be more pessimistic, than actual progress.
182///
183/// It is fine for this number not to be precise,
184/// since we don't rely on it to update the status of a stream job internally.
185///
186/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
187/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
188/// `row_count` state for it.
189pub struct CreateMviewProgressReporter {
190    barrier_manager: LocalBarrierManager,
191
192    fragment_id: FragmentId,
193
194    /// The id of the actor containing the backfill executors.
195    backfill_actor_id: ActorId,
196
197    state: Option<BackfillState>,
198}
199
200impl CreateMviewProgressReporter {
201    pub fn new(
202        barrier_manager: LocalBarrierManager,
203        fragment_id: FragmentId,
204        backfill_actor_id: ActorId,
205    ) -> Self {
206        Self {
207            barrier_manager,
208            fragment_id,
209            backfill_actor_id,
210            state: None,
211        }
212    }
213
214    #[cfg(test)]
215    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
216        Self::new(barrier_manager, 0.into(), 0.into())
217    }
218
219    pub fn actor_id(&self) -> ActorId {
220        self.backfill_actor_id
221    }
222
223    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
224        self.state = Some(state);
225        self.barrier_manager.update_create_mview_progress(
226            epoch,
227            self.fragment_id,
228            self.backfill_actor_id,
229            state,
230        );
231    }
232
233    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
234    /// monotonically increasing.
235    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
236    /// `current_consumed_rows` is an accumulated value.
237    pub fn update(
238        &mut self,
239        epoch: EpochPair,
240        consumed_epoch: ConsumedEpoch,
241        current_consumed_rows: ConsumedRows,
242    ) {
243        self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
244    }
245
246    /// Update the progress with buffered rows information.
247    /// This is used by locality backfill to report precise progress including buffered data.
248    pub fn update_with_buffered_rows(
249        &mut self,
250        epoch: EpochPair,
251        consumed_epoch: ConsumedEpoch,
252        current_consumed_rows: ConsumedRows,
253        buffered_rows: BufferedRows,
254    ) {
255        match self.state {
256            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
257                assert!(
258                    last <= consumed_epoch,
259                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
260                    last,
261                    consumed_epoch
262                );
263                assert!(last_consumed_rows <= current_consumed_rows);
264            }
265            Some(state) => {
266                panic!(
267                    "should not update consuming progress at invalid state: {:?}",
268                    state
269                )
270            }
271            None => {}
272        };
273        tracing::debug!(
274            actor_id = %self.backfill_actor_id,
275            ?epoch,
276            consumed_epoch,
277            current_consumed_rows,
278            buffered_rows,
279            "progress update"
280        );
281        self.update_inner(
282            epoch,
283            BackfillState::ConsumingUpstreamTableOrSource(
284                consumed_epoch,
285                current_consumed_rows,
286                buffered_rows,
287            ),
288        );
289    }
290
291    /// The difference from [`Self::update`] (MV backfill) is that we
292    /// don't care `ConsumedEpoch` here.
293    pub fn update_for_source_backfill(
294        &mut self,
295        epoch: EpochPair,
296        current_consumed_rows: ConsumedRows,
297    ) {
298        match self.state {
299            Some(BackfillState::ConsumingUpstreamTableOrSource(
300                dummy_last_epoch,
301                _last_consumed_rows,
302                _,
303            )) => {
304                debug_assert_eq!(dummy_last_epoch, 0);
305            }
306            Some(state) => {
307                panic!(
308                    "should not update consuming progress at invalid state: {:?}",
309                    state
310                )
311            }
312            None => {}
313        };
314        self.update_inner(
315            epoch,
316            // fill a dummy ConsumedEpoch and no buffered rows
317            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
318        );
319    }
320
321    /// Finish the progress. If the progress is already finished, then perform no-op.
322    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
323    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
324        self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
325    }
326
327    /// Finish the progress with buffered rows information.
328    /// This is used by locality backfill to report any remaining buffered rows at completion.
329    pub fn finish_with_buffered_rows(
330        &mut self,
331        epoch: EpochPair,
332        current_consumed_rows: ConsumedRows,
333        buffered_rows: BufferedRows,
334    ) {
335        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
336            return;
337        }
338        tracing::debug!(
339            actor_id = %self.backfill_actor_id,
340            ?epoch,
341            current_consumed_rows,
342            buffered_rows,
343            "progress finish"
344        );
345        // When finishing, report the consumed rows and buffered rows separately
346        self.update_inner(
347            epoch,
348            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
349        );
350    }
351
352    pub(crate) fn update_create_mview_log_store_progress(
353        &mut self,
354        epoch: EpochPair,
355        pending_epoch_lag: u64,
356    ) {
357        assert_matches!(
358            self.state,
359            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
360                | Some(BackfillState::ConsumingLogStore { .. })
361                | None,
362            "cannot update log store progress at state {:?}",
363            self.state
364        );
365        self.update_inner(
366            epoch,
367            BackfillState::ConsumingLogStore { pending_epoch_lag },
368        );
369    }
370
371    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
372        assert_matches!(
373            self.state,
374            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
375                | Some(BackfillState::ConsumingLogStore { .. })
376                | None,
377            "cannot finish log store progress at state {:?}",
378            self.state
379        );
380        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
381    }
382}
383
384impl LocalBarrierManager {
385    /// Create a struct for reporting the progress of creating mview. The backfill executors should
386    /// report the progress of barrier rearranging continuously using this. The updated progress
387    /// will be collected by the local barrier manager and reported to the meta service in this
388    /// epoch.
389    ///
390    /// When all backfill executors of the creating mview finish, the creation progress will be done at
391    /// frontend and the mview will be exposed to the user.
392    pub(crate) fn register_create_mview_progress(
393        &self,
394        actor_ctx: &ActorContext,
395    ) -> CreateMviewProgressReporter {
396        let fragment_id = actor_ctx.fragment_id;
397        let backfill_actor_id = actor_ctx.id;
398        trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
399        CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
400    }
401}