risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
30use crate::rpc::metrics::GLOBAL_META_METRICS;
31
32#[derive(Debug)]
33struct CreatingStreamingJobEpochState {
34    epoch: u64,
35    node_to_collect: NodeToCollect,
36    resps: Vec<BarrierCompleteResponse>,
37    kind: BarrierKind,
38    is_first_commit: bool,
39    enqueue_time: Instant,
40}
41
42#[derive(Debug)]
43pub(super) struct CreatingStreamingJobBarrierControl {
44    job_id: JobId,
45    // key is prev_epoch of barrier
46    inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
47    snapshot_epoch: u64,
48    is_first_committed: bool,
49    max_collected_epoch: Option<u64>,
50    max_committed_epoch: Option<u64>,
51    // newer epoch at the front.
52    pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
53    completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
54
55    // metrics
56    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
57    consuming_log_store_barrier_latency: LabelGuardedHistogram,
58
59    wait_commit_latency: LabelGuardedHistogram,
60    inflight_barrier_num: LabelGuardedIntGauge,
61}
62
63impl CreatingStreamingJobBarrierControl {
64    pub(super) fn new(
65        job_id: JobId,
66        snapshot_epoch: u64,
67        is_first_committed: bool,
68        committed_epoch: Option<u64>,
69    ) -> Self {
70        let table_id_str = format!("{}", job_id);
71        Self {
72            job_id,
73            inflight_barrier_queue: Default::default(),
74            snapshot_epoch,
75            is_first_committed,
76            max_collected_epoch: committed_epoch,
77            max_committed_epoch: committed_epoch,
78            pending_barriers_to_complete: Default::default(),
79            completing_barrier: None,
80
81            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
82                .snapshot_backfill_barrier_latency
83                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
84            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
85                .snapshot_backfill_barrier_latency
86                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
87            wait_commit_latency: GLOBAL_META_METRICS
88                .snapshot_backfill_wait_commit_latency
89                .with_guarded_label_values(&[&table_id_str]),
90            inflight_barrier_num: GLOBAL_META_METRICS
91                .snapshot_backfill_inflight_barrier_num
92                .with_guarded_label_values(&[&table_id_str]),
93        }
94    }
95
96    pub(super) fn inflight_barrier_count(&self) -> usize {
97        self.inflight_barrier_queue.len()
98    }
99
100    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
101        self.inflight_barrier_queue
102            .values_mut()
103            .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
104    }
105
106    fn latest_epoch(&self) -> Option<u64> {
107        self.inflight_barrier_queue
108            .last_key_value()
109            .map(|(epoch, _)| *epoch)
110            .or(self.max_collected_epoch)
111    }
112
113    pub(super) fn max_committed_epoch(&self) -> Option<u64> {
114        self.max_committed_epoch
115    }
116
117    pub(super) fn is_empty(&self) -> bool {
118        self.inflight_barrier_queue.is_empty()
119            && self.pending_barriers_to_complete.is_empty()
120            && self.completing_barrier.is_none()
121    }
122
123    pub(super) fn enqueue_epoch(
124        &mut self,
125        epoch: u64,
126        node_to_collect: NodeToCollect,
127        kind: BarrierKind,
128    ) {
129        debug!(
130            epoch,
131            ?node_to_collect,
132            job_id = %self.job_id,
133            "creating job enqueue epoch"
134        );
135        let is_first_commit = !self.is_first_committed;
136        if !self.is_first_committed {
137            self.is_first_committed = true;
138            assert!(
139                kind.is_checkpoint(),
140                "first barrier must be checkpoint barrier"
141            );
142        }
143        match &kind {
144            BarrierKind::Initial => {
145                assert_eq!(
146                    self.latest_epoch().expect(
147                        "should have committed when recovered and injecting Initial barrier"
148                    ),
149                    epoch
150                );
151            }
152            BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
153                if let Some(latest_epoch) = self.latest_epoch() {
154                    assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
155                }
156            }
157        }
158
159        let epoch_state = CreatingStreamingJobEpochState {
160            epoch,
161            node_to_collect,
162            resps: vec![],
163            kind,
164            is_first_commit,
165            enqueue_time: Instant::now(),
166        };
167        if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
168            self.add_collected(epoch_state);
169        } else {
170            self.inflight_barrier_queue.insert(epoch, epoch_state);
171        }
172        self.inflight_barrier_num
173            .set(self.inflight_barrier_queue.len() as _);
174    }
175
176    pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
177        let epoch = resp.epoch;
178        let worker_id = resp.worker_id;
179        debug!(
180            epoch,
181            %worker_id,
182            job_id = %self.job_id,
183            "collect barrier from worker"
184        );
185
186        let state = self
187            .inflight_barrier_queue
188            .get_mut(&epoch)
189            .expect("should exist");
190        assert!(state.node_to_collect.remove(&worker_id).is_some());
191        state.resps.push(resp);
192        while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
193            && state.node_to_collect.is_empty()
194        {
195            let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
196            self.add_collected(state);
197        }
198
199        self.inflight_barrier_num
200            .set(self.inflight_barrier_queue.len() as _);
201    }
202
203    /// Return Some((epoch, resps, `is_first_commit`))
204    ///
205    /// Only epoch within the `epoch_end_bound` can be started.
206    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
207    /// the creating job won't have higher committed epoch than the upstream.
208    pub(super) fn start_completing(
209        &mut self,
210        epoch_end_bound: Bound<u64>,
211    ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
212        assert!(self.completing_barrier.is_none());
213        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
214        while let Some(epoch_state) = self.pending_barriers_to_complete.back()
215            && epoch_range.contains(&epoch_state.epoch)
216        {
217            let mut epoch_state = self
218                .pending_barriers_to_complete
219                .pop_back()
220                .expect("non-empty");
221            let epoch = epoch_state.epoch;
222            let is_first = epoch_state.is_first_commit;
223            if is_first {
224                assert!(epoch_state.kind.is_checkpoint());
225            } else if !epoch_state.kind.is_checkpoint() {
226                continue;
227            }
228
229            let resps = take(&mut epoch_state.resps);
230            self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
231            return Some((epoch, resps, is_first));
232        }
233        None
234    }
235
236    /// Ack on completing a checkpoint barrier.
237    ///
238    /// Return the upstream epoch to be notified when there is any.
239    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
240        let (epoch_state, wait_commit_timer) =
241            self.completing_barrier.take().expect("should exist");
242        wait_commit_timer.observe_duration();
243        assert_eq!(epoch_state.epoch, completed_epoch);
244        if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
245            assert!(completed_epoch > prev_max_committed_epoch);
246        }
247    }
248
249    fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
250        assert!(epoch_state.node_to_collect.is_empty());
251        if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
252            assert!(prev_epoch_state.epoch < epoch_state.epoch);
253        }
254        if let Some(max_collected_epoch) = self.max_collected_epoch {
255            match &epoch_state.kind {
256                BarrierKind::Initial => {
257                    assert_eq!(epoch_state.epoch, max_collected_epoch);
258                }
259                BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
260                    assert!(epoch_state.epoch > max_collected_epoch);
261                }
262            }
263        }
264        self.max_collected_epoch = Some(epoch_state.epoch);
265        let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
266        let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
267            &self.consuming_snapshot_barrier_latency
268        } else {
269            &self.consuming_log_store_barrier_latency
270        };
271        barrier_latency_metrics.observe(barrier_latency);
272        if !epoch_state.kind.is_initial() {
273            self.pending_barriers_to_complete.push_front(epoch_state);
274        }
275    }
276}