risingwave_stream/task/barrier_manager/
progress.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::fmt::{Display, Formatter};
17
18use risingwave_common::util::epoch::EpochPair;
19use risingwave_pb::id::FragmentId;
20use risingwave_pb::stream_service::barrier_complete_response::PbCreateMviewProgress;
21
22use crate::executor::ActorContext;
23use crate::task::barrier_manager::LocalBarrierEvent::ReportCreateProgress;
24use crate::task::barrier_worker::managed_state::PartialGraphState;
25use crate::task::cdc_progress::CdcTableBackfillState;
26use crate::task::{ActorId, LocalBarrierManager};
27
28type ConsumedEpoch = u64;
29type ConsumedRows = u64;
30type BufferedRows = u64;
31
32#[derive(Debug, Clone, Copy)]
33pub(crate) enum BackfillState {
34    ConsumingUpstreamTableOrSource(ConsumedEpoch, ConsumedRows, BufferedRows),
35    DoneConsumingUpstreamTableOrSource(ConsumedRows, BufferedRows),
36    ConsumingLogStore { pending_epoch_lag: u64 },
37    DoneConsumingLogStore,
38}
39
40impl BackfillState {
41    pub fn to_pb(self, fragment_id: FragmentId, actor_id: ActorId) -> PbCreateMviewProgress {
42        let (done, consumed_epoch, consumed_rows, pending_epoch_lag, buffered_rows) = match self {
43            BackfillState::ConsumingUpstreamTableOrSource(
44                consumed_epoch,
45                consumed_rows,
46                buffered_rows,
47            ) => (false, consumed_epoch, consumed_rows, 0, buffered_rows),
48            BackfillState::DoneConsumingUpstreamTableOrSource(consumed_rows, buffered_rows) => {
49                (true, 0, consumed_rows, 0, buffered_rows)
50            }
51            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
52                (false, 0, 0, pending_epoch_lag, 0)
53            }
54            BackfillState::DoneConsumingLogStore => (true, 0, 0, 0, 0),
55        };
56        PbCreateMviewProgress {
57            backfill_actor_id: actor_id,
58            done,
59            consumed_epoch,
60            consumed_rows,
61            pending_epoch_lag,
62            buffered_rows,
63            fragment_id,
64        }
65    }
66}
67
68impl Display for BackfillState {
69    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
70        match self {
71            BackfillState::ConsumingUpstreamTableOrSource(epoch, rows, buffered) => {
72                write!(
73                    f,
74                    "ConsumingUpstreamTable(epoch: {}, rows: {}, buffered: {})",
75                    epoch, rows, buffered
76                )
77            }
78            BackfillState::DoneConsumingUpstreamTableOrSource(rows, buffered) => {
79                write!(
80                    f,
81                    "DoneConsumingUpstreamTable(rows: {}, buffered: {})",
82                    rows, buffered
83                )
84            }
85            BackfillState::ConsumingLogStore { pending_epoch_lag } => {
86                write!(
87                    f,
88                    "ConsumingLogStore(pending_epoch_lag: {pending_epoch_lag})"
89                )
90            }
91            BackfillState::DoneConsumingLogStore => {
92                write!(f, "DoneConsumingLogStore")
93            }
94        }
95    }
96}
97
98impl PartialGraphState {
99    pub(crate) fn update_create_mview_progress(
100        &mut self,
101        epoch: EpochPair,
102        fragment_id: FragmentId,
103        actor: ActorId,
104        state: BackfillState,
105    ) {
106        if let Some((prev_fragment_id, _)) = self
107            .graph_state
108            .create_mview_progress
109            .entry(epoch.curr)
110            .or_default()
111            .insert(actor, (fragment_id, state))
112        {
113            assert_eq!(prev_fragment_id, fragment_id)
114        }
115    }
116
117    pub(crate) fn update_cdc_table_backfill_progress(
118        &mut self,
119        epoch: EpochPair,
120        actor: ActorId,
121        state: CdcTableBackfillState,
122    ) {
123        if let Some(actor_state) = self.actor_states.get(&actor)
124            && actor_state.inflight_barriers.contains(&epoch.prev)
125        {
126            self.graph_state
127                .cdc_table_backfill_progress
128                .entry(epoch.curr)
129                .or_default()
130                .insert(actor, state);
131        } else {
132            warn!(?epoch, %actor, ?state, "ignore CDC table backfill progress");
133        }
134    }
135}
136
137impl LocalBarrierManager {
138    fn update_create_mview_progress(
139        &self,
140        epoch: EpochPair,
141        fragment_id: FragmentId,
142        actor: ActorId,
143        state: BackfillState,
144    ) {
145        self.send_event(ReportCreateProgress {
146            epoch,
147            fragment_id,
148            actor,
149            state,
150        })
151    }
152}
153
154/// The progress held by the backfill executors to report to the local barrier manager.
155///
156/// Progress can be computed by
157/// `total_rows_consumed` / `total_rows_upstream`.
158/// This yields the (approximate) percentage of rows we are done backfilling.
159///
160/// For `total_rows_consumed`, the progress is tracked in the following way:
161/// 1. Fetching the row count from our state table.
162///    This number is the total number, NOT incremental.
163///    This is done per actor.
164/// 2. Refreshing this number on the meta side, on every barrier.
165///    This is done by just summing up all the row counts from the actors.
166///
167/// For `total_rows_upstream`,
168/// this is fetched from `HummockVersion`'s statistics (`TableStats::total_key_count`).
169///
170/// This is computed per `HummockVersion`, which is updated whenever a checkpoint is committed.
171/// The `total_key_count` figure just counts the number of storage keys.
172/// For example, if a key is inserted and then deleted,
173/// it results two storage entries in `LSMt`, so count=2.
174/// Only after compaction, the count will drop back to 0.
175///
176/// So the total count could be more pessimistic, than actual progress.
177///
178/// It is fine for this number not to be precise,
179/// since we don't rely on it to update the status of a stream job internally.
180///
181/// TODO(kwannoel): Perhaps it is possible to get total key count of the replicated state table
182/// for arrangement backfill. We can use that to estimate the progress as well, and avoid recording
183/// `row_count` state for it.
184pub struct CreateMviewProgressReporter {
185    barrier_manager: LocalBarrierManager,
186
187    fragment_id: FragmentId,
188
189    /// The id of the actor containing the backfill executors.
190    backfill_actor_id: ActorId,
191
192    state: Option<BackfillState>,
193}
194
195impl CreateMviewProgressReporter {
196    pub fn new(
197        barrier_manager: LocalBarrierManager,
198        fragment_id: FragmentId,
199        backfill_actor_id: ActorId,
200    ) -> Self {
201        Self {
202            barrier_manager,
203            fragment_id,
204            backfill_actor_id,
205            state: None,
206        }
207    }
208
209    #[cfg(test)]
210    pub fn for_test(barrier_manager: LocalBarrierManager) -> Self {
211        Self::new(barrier_manager, 0.into(), 0.into())
212    }
213
214    pub fn actor_id(&self) -> ActorId {
215        self.backfill_actor_id
216    }
217
218    fn update_inner(&mut self, epoch: EpochPair, state: BackfillState) {
219        self.state = Some(state);
220        self.barrier_manager.update_create_mview_progress(
221            epoch,
222            self.fragment_id,
223            self.backfill_actor_id,
224            state,
225        );
226    }
227
228    /// Update the progress to `ConsumingUpstream(consumed_epoch, consumed_rows)`. The epoch must be
229    /// monotonically increasing.
230    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
231    /// `current_consumed_rows` is an accumulated value.
232    pub fn update(
233        &mut self,
234        epoch: EpochPair,
235        consumed_epoch: ConsumedEpoch,
236        current_consumed_rows: ConsumedRows,
237    ) {
238        self.update_with_buffered_rows(epoch, consumed_epoch, current_consumed_rows, 0);
239    }
240
241    /// Update the progress with buffered rows information.
242    /// This is used by locality backfill to report precise progress including buffered data.
243    pub fn update_with_buffered_rows(
244        &mut self,
245        epoch: EpochPair,
246        consumed_epoch: ConsumedEpoch,
247        current_consumed_rows: ConsumedRows,
248        buffered_rows: BufferedRows,
249    ) {
250        match self.state {
251            Some(BackfillState::ConsumingUpstreamTableOrSource(last, last_consumed_rows, _)) => {
252                assert!(
253                    last <= consumed_epoch,
254                    "last_epoch: {:#?} must be greater than consumed epoch: {:#?}",
255                    last,
256                    consumed_epoch
257                );
258                assert!(last_consumed_rows <= current_consumed_rows);
259            }
260            Some(state) => {
261                panic!(
262                    "should not update consuming progress at invalid state: {:?}",
263                    state
264                )
265            }
266            None => {}
267        };
268        tracing::debug!(
269            actor_id = %self.backfill_actor_id,
270            ?epoch,
271            consumed_epoch,
272            current_consumed_rows,
273            buffered_rows,
274            "progress update"
275        );
276        self.update_inner(
277            epoch,
278            BackfillState::ConsumingUpstreamTableOrSource(
279                consumed_epoch,
280                current_consumed_rows,
281                buffered_rows,
282            ),
283        );
284    }
285
286    /// The difference from [`Self::update`] (MV backfill) is that we
287    /// don't care `ConsumedEpoch` here.
288    pub fn update_for_source_backfill(
289        &mut self,
290        epoch: EpochPair,
291        current_consumed_rows: ConsumedRows,
292    ) {
293        match self.state {
294            Some(BackfillState::ConsumingUpstreamTableOrSource(
295                dummy_last_epoch,
296                _last_consumed_rows,
297                _,
298            )) => {
299                debug_assert_eq!(dummy_last_epoch, 0);
300            }
301            Some(state) => {
302                panic!(
303                    "should not update consuming progress at invalid state: {:?}",
304                    state
305                )
306            }
307            None => {}
308        };
309        self.update_inner(
310            epoch,
311            // fill a dummy ConsumedEpoch and no buffered rows
312            BackfillState::ConsumingUpstreamTableOrSource(0, current_consumed_rows, 0),
313        );
314    }
315
316    /// Finish the progress. If the progress is already finished, then perform no-op.
317    /// `current_epoch` should be provided to locate the barrier under concurrent checkpoint.
318    pub fn finish(&mut self, epoch: EpochPair, current_consumed_rows: ConsumedRows) {
319        self.finish_with_buffered_rows(epoch, current_consumed_rows, 0);
320    }
321
322    /// Finish the progress with buffered rows information.
323    /// This is used by locality backfill to report any remaining buffered rows at completion.
324    pub fn finish_with_buffered_rows(
325        &mut self,
326        epoch: EpochPair,
327        current_consumed_rows: ConsumedRows,
328        buffered_rows: BufferedRows,
329    ) {
330        if let Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _)) = self.state {
331            return;
332        }
333        tracing::debug!(
334            actor_id = %self.backfill_actor_id,
335            ?epoch,
336            current_consumed_rows,
337            buffered_rows,
338            "progress finish"
339        );
340        // When finishing, report the consumed rows and buffered rows separately
341        self.update_inner(
342            epoch,
343            BackfillState::DoneConsumingUpstreamTableOrSource(current_consumed_rows, buffered_rows),
344        );
345    }
346
347    pub(crate) fn update_create_mview_log_store_progress(
348        &mut self,
349        epoch: EpochPair,
350        pending_epoch_lag: u64,
351    ) {
352        assert_matches!(
353            self.state,
354            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
355                | Some(BackfillState::ConsumingLogStore { .. })
356                | None,
357            "cannot update log store progress at state {:?}",
358            self.state
359        );
360        self.update_inner(
361            epoch,
362            BackfillState::ConsumingLogStore { pending_epoch_lag },
363        );
364    }
365
366    pub(crate) fn finish_consuming_log_store(&mut self, epoch: EpochPair) {
367        assert_matches!(
368            self.state,
369            Some(BackfillState::DoneConsumingUpstreamTableOrSource(_, _))
370                | Some(BackfillState::ConsumingLogStore { .. })
371                | None,
372            "cannot finish log store progress at state {:?}",
373            self.state
374        );
375        self.update_inner(epoch, BackfillState::DoneConsumingLogStore);
376    }
377}
378
379impl LocalBarrierManager {
380    /// Create a struct for reporting the progress of creating mview. The backfill executors should
381    /// report the progress of barrier rearranging continuously using this. The updated progress
382    /// will be collected by the local barrier manager and reported to the meta service in this
383    /// epoch.
384    ///
385    /// When all backfill executors of the creating mview finish, the creation progress will be done at
386    /// frontend and the mview will be exposed to the user.
387    pub(crate) fn register_create_mview_progress(
388        &self,
389        actor_ctx: &ActorContext,
390    ) -> CreateMviewProgressReporter {
391        let fragment_id = actor_ctx.fragment_id;
392        let backfill_actor_id = actor_ctx.id;
393        trace!(%fragment_id, %backfill_actor_id, "register create mview progress");
394        CreateMviewProgressReporter::new(self.clone(), fragment_id, backfill_actor_id)
395    }
396}