risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, VecDeque};
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::stream_service::BarrierCompleteResponse;
26use tracing::debug;
27
28use crate::barrier::BarrierKind;
29use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
30use crate::barrier::notifier::Notifier;
31use crate::barrier::utils::{NodeToCollect, is_valid_after_worker_err};
32use crate::rpc::metrics::GLOBAL_META_METRICS;
33
34#[derive(Debug)]
35struct CreatingStreamingJobEpochState {
36    epoch: u64,
37    node_to_collect: NodeToCollect,
38    resps: Vec<BarrierCompleteResponse>,
39    notifiers: Vec<Notifier>,
40    kind: BarrierKind,
41    first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
42    enqueue_time: Instant,
43}
44
45#[derive(Debug)]
46pub(super) struct CreatingStreamingJobBarrierControl {
47    job_id: JobId,
48    // key is prev_epoch of barrier
49    inflight_barrier_queue: BTreeMap<u64, CreatingStreamingJobEpochState>,
50    snapshot_epoch: u64,
51    max_collected_epoch: Option<u64>,
52    max_committed_epoch: Option<u64>,
53    // newer epoch at the front.
54    pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
55    completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
56
57    // metrics
58    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
59    consuming_log_store_barrier_latency: LabelGuardedHistogram,
60
61    wait_commit_latency: LabelGuardedHistogram,
62    inflight_barrier_num: LabelGuardedIntGauge,
63}
64
65impl CreatingStreamingJobBarrierControl {
66    pub(super) fn new(job_id: JobId, snapshot_epoch: u64, committed_epoch: Option<u64>) -> Self {
67        let table_id_str = format!("{}", job_id);
68        Self {
69            job_id,
70            inflight_barrier_queue: Default::default(),
71            snapshot_epoch,
72            max_collected_epoch: committed_epoch,
73            max_committed_epoch: committed_epoch,
74            pending_barriers_to_complete: Default::default(),
75            completing_barrier: None,
76
77            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
78                .snapshot_backfill_barrier_latency
79                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
80            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
81                .snapshot_backfill_barrier_latency
82                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
83            wait_commit_latency: GLOBAL_META_METRICS
84                .snapshot_backfill_wait_commit_latency
85                .with_guarded_label_values(&[&table_id_str]),
86            inflight_barrier_num: GLOBAL_META_METRICS
87                .snapshot_backfill_inflight_barrier_num
88                .with_guarded_label_values(&[&table_id_str]),
89        }
90    }
91
92    pub(super) fn inflight_barrier_count(&self) -> usize {
93        self.inflight_barrier_queue.len()
94    }
95
96    pub(super) fn is_valid_after_worker_err(&self, worker_id: WorkerId) -> bool {
97        self.inflight_barrier_queue
98            .values()
99            .all(|state| is_valid_after_worker_err(&state.node_to_collect, worker_id))
100    }
101
102    fn latest_epoch(&self) -> Option<u64> {
103        self.inflight_barrier_queue
104            .last_key_value()
105            .map(|(epoch, _)| *epoch)
106            .or(self.max_collected_epoch)
107    }
108
109    pub(super) fn max_committed_epoch(&self) -> Option<u64> {
110        self.max_committed_epoch
111    }
112
113    pub(super) fn is_empty(&self) -> bool {
114        self.inflight_barrier_queue.is_empty()
115            && self.pending_barriers_to_complete.is_empty()
116            && self.completing_barrier.is_none()
117    }
118
119    pub(super) fn enqueue_epoch(
120        &mut self,
121        epoch: u64,
122        node_to_collect: NodeToCollect,
123        kind: BarrierKind,
124        notifiers: Vec<Notifier>,
125        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
126    ) {
127        debug!(
128            epoch,
129            ?node_to_collect,
130            job_id = %self.job_id,
131            "creating job enqueue epoch"
132        );
133        if first_create_info.is_some() {
134            assert!(
135                kind.is_checkpoint(),
136                "first barrier must be checkpoint barrier"
137            );
138        }
139        match &kind {
140            BarrierKind::Initial => {
141                assert_eq!(
142                    self.latest_epoch().expect(
143                        "should have committed when recovered and injecting Initial barrier"
144                    ),
145                    epoch
146                );
147            }
148            BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
149                if let Some(latest_epoch) = self.latest_epoch() {
150                    assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
151                }
152            }
153        }
154
155        let epoch_state = CreatingStreamingJobEpochState {
156            epoch,
157            node_to_collect,
158            resps: vec![],
159            notifiers,
160            kind,
161            first_create_info,
162            enqueue_time: Instant::now(),
163        };
164        if epoch_state.node_to_collect.is_empty() && self.inflight_barrier_queue.is_empty() {
165            self.add_collected(epoch_state);
166        } else {
167            self.inflight_barrier_queue.insert(epoch, epoch_state);
168        }
169        self.inflight_barrier_num
170            .set(self.inflight_barrier_queue.len() as _);
171    }
172
173    pub(super) fn collect(&mut self, resp: BarrierCompleteResponse) {
174        let epoch = resp.epoch;
175        let worker_id = resp.worker_id;
176        debug!(
177            epoch,
178            %worker_id,
179            job_id = %self.job_id,
180            "collect barrier from worker"
181        );
182
183        let state = self
184            .inflight_barrier_queue
185            .get_mut(&epoch)
186            .expect("should exist");
187        assert!(state.node_to_collect.remove(&worker_id));
188        state.resps.push(resp);
189        while let Some((_, state)) = self.inflight_barrier_queue.first_key_value()
190            && state.node_to_collect.is_empty()
191        {
192            let (_, state) = self.inflight_barrier_queue.pop_first().expect("non-empty");
193            self.add_collected(state);
194        }
195
196        self.inflight_barrier_num
197            .set(self.inflight_barrier_queue.len() as _);
198    }
199
200    /// Return Some((epoch, resps, `is_first_commit`))
201    ///
202    /// Only epoch within the `epoch_end_bound` can be started.
203    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
204    /// the creating job won't have higher committed epoch than the upstream.
205    pub(super) fn start_completing(
206        &mut self,
207        epoch_end_bound: Bound<u64>,
208    ) -> Option<(
209        u64,
210        Vec<BarrierCompleteResponse>,
211        Option<CreateSnapshotBackfillJobCommandInfo>,
212    )> {
213        assert!(self.completing_barrier.is_none());
214        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
215        while let Some(epoch_state) = self.pending_barriers_to_complete.back()
216            && epoch_range.contains(&epoch_state.epoch)
217        {
218            let mut epoch_state = self
219                .pending_barriers_to_complete
220                .pop_back()
221                .expect("non-empty");
222            let epoch = epoch_state.epoch;
223            let first_create_info = epoch_state.first_create_info.take();
224            if first_create_info.is_some() {
225                assert!(epoch_state.kind.is_checkpoint());
226            } else if !epoch_state.kind.is_checkpoint() {
227                continue;
228            }
229
230            let resps = take(&mut epoch_state.resps);
231            self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
232            return Some((epoch, resps, first_create_info));
233        }
234        None
235    }
236
237    /// Ack on completing a checkpoint barrier.
238    ///
239    /// Return the upstream epoch to be notified when there is any.
240    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
241        let (epoch_state, wait_commit_timer) =
242            self.completing_barrier.take().expect("should exist");
243        wait_commit_timer.observe_duration();
244        assert_eq!(epoch_state.epoch, completed_epoch);
245        for notifier in epoch_state.notifiers {
246            notifier.notify_collected();
247        }
248        if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
249            assert!(completed_epoch > prev_max_committed_epoch);
250        }
251    }
252
253    fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
254        assert!(epoch_state.node_to_collect.is_empty());
255        if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
256            assert!(prev_epoch_state.epoch < epoch_state.epoch);
257        }
258        if let Some(max_collected_epoch) = self.max_collected_epoch {
259            match &epoch_state.kind {
260                BarrierKind::Initial => {
261                    assert_eq!(epoch_state.epoch, max_collected_epoch);
262                }
263                BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
264                    assert!(epoch_state.epoch > max_collected_epoch);
265                }
266            }
267        }
268        self.max_collected_epoch = Some(epoch_state.epoch);
269        let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
270        let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
271            &self.consuming_snapshot_barrier_latency
272        } else {
273            &self.consuming_log_store_barrier_latency
274        };
275        barrier_latency_metrics.observe(barrier_latency);
276        if !epoch_state.kind.is_initial() {
277            self.pending_barriers_to_complete.push_front(epoch_state);
278        }
279    }
280}