risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::catalog::TableId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
30use crate::rpc::metrics::GLOBAL_META_METRICS;
31
32#[derive(Debug)]
33struct CreatingStreamingJobEpochState {
34    epoch: u64,
35    node_to_collect: NodeToCollect,
36    resps: Vec<BarrierCompleteResponse>,
37    kind: BarrierKind,
38    is_first_commit: bool,
39    enqueue_time: Instant,
40}
41
42#[derive(Debug)]
43pub(super) struct CreatingStreamingJobBarrierControl {
44    table_id: TableId,
45    // key is prev_epoch of barrier
46    inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
47    backfill_epoch: u64,
48    is_first_committed: bool,
49    max_collected_epoch: Option<u64>,
50    // newer epoch at the front.
51    pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
52    completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
53
54    // metrics
55    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
56    consuming_log_store_barrier_latency: LabelGuardedHistogram,
57
58    wait_commit_latency: LabelGuardedHistogram,
59    inflight_barrier_num: LabelGuardedIntGauge,
60}
61
62impl CreatingStreamingJobBarrierControl {
63    pub(super) fn new(table_id: TableId, backfill_epoch: u64, is_first_committed: bool) -> Self {
64        let table_id_str = format!("{}", table_id.table_id);
65        Self {
66            table_id,
67            inflight_barrier_queue: Default::default(),
68            backfill_epoch,
69            is_first_committed,
70            max_collected_epoch: None,
71            pending_barriers_to_complete: Default::default(),
72            completing_barrier: None,
73
74            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
75                .snapshot_backfill_barrier_latency
76                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
77            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
78                .snapshot_backfill_barrier_latency
79                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
80            wait_commit_latency: GLOBAL_META_METRICS
81                .snapshot_backfill_wait_commit_latency
82                .with_guarded_label_values(&[&table_id_str]),
83            inflight_barrier_num: GLOBAL_META_METRICS
84                .snapshot_backfill_inflight_barrier_num
85                .with_guarded_label_values(&[&table_id_str]),
86        }
87    }
88
89    pub(super) fn inflight_barrier_count(&self) -> usize {
90        self.inflight_barrier_queue.len()
91    }
92
93    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
94        self.inflight_barrier_queue
95            .values_mut()
96            .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
97    }
98
99    fn latest_epoch(&self) -> Option<u64> {
100        self.inflight_barrier_queue
101            .last_key_value()
102            .map(|(epoch, _)| *epoch)
103            .or(self.max_collected_epoch)
104    }
105
106    pub(super) fn max_collected_epoch(&self) -> Option<u64> {
107        self.max_collected_epoch
108    }
109
110    pub(super) fn is_empty(&self) -> bool {
111        self.inflight_barrier_queue.is_empty()
112            && self.pending_barriers_to_complete.is_empty()
113            && self.completing_barrier.is_none()
114    }
115
116    pub(super) fn enqueue_epoch(
117        &mut self,
118        epoch: u64,
119        node_to_collect: NodeToCollect,
120        kind: BarrierKind,
121    ) {
122        debug!(
123            epoch,
124            ?node_to_collect,
125            table_id = self.table_id.table_id,
126            "creating job enqueue epoch"
127        );
128        let is_first_commit = !self.is_first_committed;
129        if !self.is_first_committed {
130            self.is_first_committed = true;
131            assert!(
132                kind.is_checkpoint(),
133                "first barrier must be checkpoint barrier"
134            );
135        }
136        if let Some(latest_epoch) = self.latest_epoch() {
137            assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
138        }
139        let epoch_state = CreatingStreamingJobEpochState {
140            epoch,
141            node_to_collect,
142            resps: vec![],
143            kind,
144            is_first_commit,
145            enqueue_time: Instant::now(),
146        };
147        if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
148            self.add_collected(epoch_state);
149        } else {
150            self.inflight_barrier_queue.insert(epoch, epoch_state);
151        }
152        self.inflight_barrier_num
153            .set(self.inflight_barrier_queue.len() as _);
154    }
155
156    pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
157        let epoch = resp.epoch;
158        let worker_id = resp.worker_id as WorkerId;
159        debug!(
160            epoch,
161            worker_id,
162            table_id = self.table_id.table_id,
163            "collect barrier from worker"
164        );
165
166        let state = self
167            .inflight_barrier_queue
168            .get_mut(&epoch)
169            .expect("should exist");
170        assert!(state.node_to_collect.remove(&worker_id).is_some());
171        state.resps.push(resp);
172        while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
173            && state.node_to_collect.is_empty()
174        {
175            let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
176            self.add_collected(state);
177        }
178
179        self.inflight_barrier_num
180            .set(self.inflight_barrier_queue.len() as _);
181    }
182
183    /// Return Some((epoch, resps, `is_first_commit`))
184    ///
185    /// Only epoch within the `epoch_end_bound` can be started.
186    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
187    /// the creating job won't have higher committed epoch than the upstream.
188    pub(super) fn start_completing(
189        &mut self,
190        epoch_end_bound: Bound<u64>,
191    ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
192        assert!(self.completing_barrier.is_none());
193        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
194        while let Some(epoch_state) = self.pending_barriers_to_complete.back()
195            && epoch_range.contains(&epoch_state.epoch)
196        {
197            let mut epoch_state = self
198                .pending_barriers_to_complete
199                .pop_back()
200                .expect("non-empty");
201            let epoch = epoch_state.epoch;
202            let is_first = epoch_state.is_first_commit;
203            if is_first {
204                assert!(epoch_state.kind.is_checkpoint());
205            } else if !epoch_state.kind.is_checkpoint() {
206                continue;
207            }
208
209            let resps = take(&mut epoch_state.resps);
210            self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
211            return Some((epoch, resps, is_first));
212        }
213        None
214    }
215
216    /// Ack on completing a checkpoint barrier.
217    ///
218    /// Return the upstream epoch to be notified when there is any.
219    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
220        let (epoch_state, wait_commit_timer) =
221            self.completing_barrier.take().expect("should exist");
222        wait_commit_timer.observe_duration();
223        assert_eq!(epoch_state.epoch, completed_epoch);
224    }
225
226    fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
227        assert!(epoch_state.node_to_collect.is_empty());
228        if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
229            assert!(prev_epoch_state.epoch < epoch_state.epoch);
230        }
231        if let Some(max_collected_epoch) = self.max_collected_epoch {
232            assert!(epoch_state.epoch > max_collected_epoch);
233        }
234        self.max_collected_epoch = Some(epoch_state.epoch);
235        let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
236        let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
237            &self.consuming_snapshot_barrier_latency
238        } else {
239            &self.consuming_log_store_barrier_latency
240        };
241        barrier_latency_metrics.observe(barrier_latency);
242        if !epoch_state.kind.is_initial() {
243            self.pending_barriers_to_complete.push_front(epoch_state);
244        }
245    }
246}