risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::catalog::TableId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
29use crate::rpc::metrics::GLOBAL_META_METRICS;
30
31#[derive(Debug)]
32struct CreatingStreamingJobEpochState {
33    epoch: u64,
34    node_to_collect: NodeToCollect,
35    resps: Vec<BarrierCompleteResponse>,
36    is_checkpoint: bool,
37    is_first_commit: bool,
38    enqueue_time: Instant,
39}
40
41#[derive(Debug)]
42pub(super) struct CreatingStreamingJobBarrierControl {
43    table_id: TableId,
44    // key is prev_epoch of barrier
45    inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
46    backfill_epoch: u64,
47    is_first_committed: bool,
48    max_collected_epoch: Option<u64>,
49    // newer epoch at the front.
50    pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
51    completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
52
53    // metrics
54    consuming_snapshot_barrier_latency: LabelGuardedHistogram<2>,
55    consuming_log_store_barrier_latency: LabelGuardedHistogram<2>,
56
57    wait_commit_latency: LabelGuardedHistogram<1>,
58    inflight_barrier_num: LabelGuardedIntGauge<1>,
59}
60
61impl CreatingStreamingJobBarrierControl {
62    pub(super) fn new(table_id: TableId, backfill_epoch: u64, is_first_committed: bool) -> Self {
63        let table_id_str = format!("{}", table_id.table_id);
64        Self {
65            table_id,
66            inflight_barrier_queue: Default::default(),
67            backfill_epoch,
68            is_first_committed,
69            max_collected_epoch: None,
70            pending_barriers_to_complete: Default::default(),
71            completing_barrier: None,
72
73            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
74                .snapshot_backfill_barrier_latency
75                .with_guarded_label_values(&[&table_id_str, "consuming_snapshot"]),
76            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
77                .snapshot_backfill_barrier_latency
78                .with_guarded_label_values(&[&table_id_str, "consuming_log_store"]),
79            wait_commit_latency: GLOBAL_META_METRICS
80                .snapshot_backfill_wait_commit_latency
81                .with_guarded_label_values(&[&table_id_str]),
82            inflight_barrier_num: GLOBAL_META_METRICS
83                .snapshot_backfill_inflight_barrier_num
84                .with_guarded_label_values(&[&table_id_str]),
85        }
86    }
87
88    pub(super) fn inflight_barrier_count(&self) -> usize {
89        self.inflight_barrier_queue.len()
90    }
91
92    pub(super) fn is_valid_after_worker_err(&mut self, worker_id: WorkerId) -> bool {
93        self.inflight_barrier_queue
94            .values_mut()
95            .all(|state| is_valid_after_worker_err(&mut state.node_to_collect, worker_id))
96    }
97
98    fn latest_epoch(&self) -> Option<u64> {
99        self.inflight_barrier_queue
100            .last_key_value()
101            .map(|(epoch, _)| *epoch)
102            .or(self.max_collected_epoch)
103    }
104
105    pub(super) fn max_collected_epoch(&self) -> Option<u64> {
106        self.max_collected_epoch
107    }
108
109    pub(super) fn is_empty(&self) -> bool {
110        self.inflight_barrier_queue.is_empty()
111            && self.pending_barriers_to_complete.is_empty()
112            && self.completing_barrier.is_none()
113    }
114
115    pub(super) fn enqueue_epoch(
116        &mut self,
117        epoch: u64,
118        node_to_collect: NodeToCollect,
119        is_checkpoint: bool,
120    ) {
121        debug!(
122            epoch,
123            ?node_to_collect,
124            table_id = self.table_id.table_id,
125            "creating job enqueue epoch"
126        );
127        let is_first_commit = !self.is_first_committed;
128        if !self.is_first_committed {
129            self.is_first_committed = true;
130            assert!(is_checkpoint, "first barrier must be checkpoint barrier");
131        }
132        if let Some(latest_epoch) = self.latest_epoch() {
133            assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
134        }
135        let epoch_state = CreatingStreamingJobEpochState {
136            epoch,
137            node_to_collect,
138            resps: vec![],
139            is_checkpoint,
140            is_first_commit,
141            enqueue_time: Instant::now(),
142        };
143        if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
144            self.add_collected(epoch_state);
145        } else {
146            self.inflight_barrier_queue.insert(epoch, epoch_state);
147        }
148        self.inflight_barrier_num
149            .set(self.inflight_barrier_queue.len() as _);
150    }
151
152    pub(super) fn collect(
153        &mut self,
154        epoch: u64,
155        worker_id: WorkerId,
156        resp: BarrierCompleteResponse,
157    ) {
158        debug!(
159            epoch,
160            worker_id,
161            table_id = self.table_id.table_id,
162            "collect barrier from worker"
163        );
164
165        let state = self
166            .inflight_barrier_queue
167            .get_mut(&epoch)
168            .expect("should exist");
169        assert!(state.node_to_collect.remove(&worker_id).is_some());
170        state.resps.push(resp);
171        while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
172            && state.node_to_collect.is_empty()
173        {
174            let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
175            self.add_collected(state);
176        }
177
178        self.inflight_barrier_num
179            .set(self.inflight_barrier_queue.len() as _);
180    }
181
182    /// Return Some((epoch, resps, `is_first_commit`))
183    ///
184    /// Only epoch within the `epoch_end_bound` can be started.
185    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
186    /// the creating job won't have higher committed epoch than the upstream.
187    pub(super) fn start_completing(
188        &mut self,
189        epoch_end_bound: Bound<u64>,
190    ) -> Option<(u64, Vec<BarrierCompleteResponse>, bool)> {
191        assert!(self.completing_barrier.is_none());
192        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
193        while let Some(epoch_state) = self.pending_barriers_to_complete.back()
194            && epoch_range.contains(&epoch_state.epoch)
195        {
196            let mut epoch_state = self
197                .pending_barriers_to_complete
198                .pop_back()
199                .expect("non-empty");
200            let epoch = epoch_state.epoch;
201            let is_first = epoch_state.is_first_commit;
202            if is_first {
203                assert!(epoch_state.is_checkpoint);
204            } else if !epoch_state.is_checkpoint {
205                continue;
206            }
207
208            let resps = take(&mut epoch_state.resps);
209            self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
210            return Some((epoch, resps, is_first));
211        }
212        None
213    }
214
215    /// Ack on completing a checkpoint barrier.
216    ///
217    /// Return the upstream epoch to be notified when there is any.
218    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
219        let (epoch_state, wait_commit_timer) =
220            self.completing_barrier.take().expect("should exist");
221        wait_commit_timer.observe_duration();
222        assert_eq!(epoch_state.epoch, completed_epoch);
223    }
224
225    fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
226        assert!(epoch_state.node_to_collect.is_empty());
227        if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
228            assert!(prev_epoch_state.epoch < epoch_state.epoch);
229        }
230        if let Some(max_collected_epoch) = self.max_collected_epoch {
231            assert!(epoch_state.epoch > max_collected_epoch);
232        }
233        self.max_collected_epoch = Some(epoch_state.epoch);
234        let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
235        let barrier_latency_metrics = if epoch_state.epoch < self.backfill_epoch {
236            &self.consuming_snapshot_barrier_latency
237        } else {
238            &self.consuming_log_store_barrier_latency
239        };
240        barrier_latency_metrics.observe(barrier_latency);
241        self.pending_barriers_to_complete.push_front(epoch_state);
242    }
243}