risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::mem::take;
17use std::ops::Bound::Unbounded;
18use std::ops::{Bound, RangeBounds};
19use std::time::Instant;
20
21use prometheus::HistogramTimer;
22use risingwave_common::id::JobId;
23use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
24use risingwave_pb::stream_service::BarrierCompleteResponse;
25use tracing::debug;
26
27use crate::barrier::BarrierKind;
28use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
29use crate::barrier::notifier::Notifier;
30use crate::barrier::partial_graph::CollectedBarrier;
31use crate::rpc::metrics::GLOBAL_META_METRICS;
32
33#[derive(Debug)]
34struct CreatingStreamingJobEpochState {
35    epoch: u64,
36    resps: Vec<BarrierCompleteResponse>,
37    notifiers: Vec<Notifier>,
38    kind: BarrierKind,
39    first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
40    enqueue_time: Instant,
41}
42
43#[derive(Debug)]
44pub(super) struct CreatingStreamingJobBarrierControl {
45    job_id: JobId,
46    // newer epoch at the front. `push_front` and `pop_back`
47    inflight_barrier_queue: VecDeque<CreatingStreamingJobEpochState>,
48    snapshot_epoch: u64,
49    max_collected_epoch: Option<u64>,
50    max_committed_epoch: Option<u64>,
51    // newer epoch at the front. `push_front` and `pop_back`
52    pending_barriers_to_complete: VecDeque<CreatingStreamingJobEpochState>,
53    completing_barrier: Option<(CreatingStreamingJobEpochState, HistogramTimer)>,
54
55    // metrics
56    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
57    consuming_log_store_barrier_latency: LabelGuardedHistogram,
58
59    wait_commit_latency: LabelGuardedHistogram,
60    inflight_barrier_num: LabelGuardedIntGauge,
61}
62
63impl CreatingStreamingJobBarrierControl {
64    pub(super) fn new(job_id: JobId, snapshot_epoch: u64, committed_epoch: Option<u64>) -> Self {
65        let table_id_str = format!("{}", job_id);
66        Self {
67            job_id,
68            inflight_barrier_queue: Default::default(),
69            snapshot_epoch,
70            max_collected_epoch: committed_epoch,
71            max_committed_epoch: committed_epoch,
72            pending_barriers_to_complete: Default::default(),
73            completing_barrier: None,
74
75            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
76                .snapshot_backfill_barrier_latency
77                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
78            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
79                .snapshot_backfill_barrier_latency
80                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
81            wait_commit_latency: GLOBAL_META_METRICS
82                .snapshot_backfill_wait_commit_latency
83                .with_guarded_label_values(&[&table_id_str]),
84            inflight_barrier_num: GLOBAL_META_METRICS
85                .snapshot_backfill_inflight_barrier_num
86                .with_guarded_label_values(&[&table_id_str]),
87        }
88    }
89
90    pub(super) fn inflight_barrier_count(&self) -> usize {
91        self.inflight_barrier_queue.len()
92    }
93
94    fn latest_epoch(&self) -> Option<u64> {
95        self.inflight_barrier_queue
96            .front()
97            .map(|epoch| epoch.epoch)
98            .or(self.max_collected_epoch)
99    }
100
101    pub(super) fn max_committed_epoch(&self) -> Option<u64> {
102        self.max_committed_epoch
103    }
104
105    pub(super) fn is_empty(&self) -> bool {
106        self.inflight_barrier_queue.is_empty()
107            && self.pending_barriers_to_complete.is_empty()
108            && self.completing_barrier.is_none()
109    }
110
111    pub(super) fn enqueue_epoch(
112        &mut self,
113        epoch: u64,
114        kind: BarrierKind,
115        notifiers: Vec<Notifier>,
116        first_create_info: Option<CreateSnapshotBackfillJobCommandInfo>,
117    ) {
118        debug!(
119            epoch,
120            job_id = %self.job_id,
121            "creating job enqueue epoch"
122        );
123        if first_create_info.is_some() {
124            assert!(
125                kind.is_checkpoint(),
126                "first barrier must be checkpoint barrier"
127            );
128        }
129        match &kind {
130            BarrierKind::Initial => {
131                unreachable!("should not inject initial barrier here");
132            }
133            BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
134                if let Some(latest_epoch) = self.latest_epoch() {
135                    assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
136                }
137            }
138        }
139
140        let epoch_state = CreatingStreamingJobEpochState {
141            epoch,
142            resps: vec![],
143            notifiers,
144            kind,
145            first_create_info,
146            enqueue_time: Instant::now(),
147        };
148        self.inflight_barrier_queue.push_front(epoch_state);
149        self.inflight_barrier_num
150            .set(self.inflight_barrier_queue.len() as _);
151    }
152
153    pub(super) fn collect(&mut self, collected_barrier: CollectedBarrier) {
154        let mut state = self
155            .inflight_barrier_queue
156            .pop_back()
157            .expect("non-empty when collected");
158        assert_eq!(state.epoch, collected_barrier.epoch.prev);
159        assert!(state.resps.is_empty());
160        state.resps.extend(collected_barrier.resps.into_values());
161        self.add_collected(state);
162
163        self.inflight_barrier_num
164            .set(self.inflight_barrier_queue.len() as _);
165    }
166
167    /// Return Some((epoch, resps, `is_first_commit`))
168    ///
169    /// Only epoch within the `epoch_end_bound` can be started.
170    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
171    /// the creating job won't have higher committed epoch than the upstream.
172    pub(super) fn start_completing(
173        &mut self,
174        epoch_end_bound: Bound<u64>,
175    ) -> Option<(
176        u64,
177        Vec<BarrierCompleteResponse>,
178        Option<CreateSnapshotBackfillJobCommandInfo>,
179    )> {
180        assert!(self.completing_barrier.is_none());
181        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
182        while let Some(epoch_state) = self.pending_barriers_to_complete.back()
183            && epoch_range.contains(&epoch_state.epoch)
184        {
185            let mut epoch_state = self
186                .pending_barriers_to_complete
187                .pop_back()
188                .expect("non-empty");
189            let epoch = epoch_state.epoch;
190            let first_create_info = epoch_state.first_create_info.take();
191            if first_create_info.is_some() {
192                assert!(epoch_state.kind.is_checkpoint());
193            } else if !epoch_state.kind.is_checkpoint() {
194                continue;
195            }
196
197            let resps = take(&mut epoch_state.resps);
198            self.completing_barrier = Some((epoch_state, self.wait_commit_latency.start_timer()));
199            return Some((epoch, resps, first_create_info));
200        }
201        None
202    }
203
204    /// Ack on completing a checkpoint barrier.
205    ///
206    /// Return the upstream epoch to be notified when there is any.
207    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
208        let (epoch_state, wait_commit_timer) =
209            self.completing_barrier.take().expect("should exist");
210        wait_commit_timer.observe_duration();
211        assert_eq!(epoch_state.epoch, completed_epoch);
212        for notifier in epoch_state.notifiers {
213            notifier.notify_collected();
214        }
215        if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
216            assert!(completed_epoch > prev_max_committed_epoch);
217        }
218    }
219
220    fn add_collected(&mut self, epoch_state: CreatingStreamingJobEpochState) {
221        if let Some(prev_epoch_state) = self.pending_barriers_to_complete.front() {
222            assert!(prev_epoch_state.epoch < epoch_state.epoch);
223        }
224        if let Some(max_collected_epoch) = self.max_collected_epoch {
225            match &epoch_state.kind {
226                BarrierKind::Initial => {
227                    unreachable!("should not collect initial barrier here")
228                }
229                BarrierKind::Barrier | BarrierKind::Checkpoint(_) => {
230                    assert!(epoch_state.epoch > max_collected_epoch);
231                }
232            }
233        }
234        self.max_collected_epoch = Some(epoch_state.epoch);
235        let barrier_latency = epoch_state.enqueue_time.elapsed().as_secs_f64();
236        let barrier_latency_metrics = if epoch_state.epoch < self.snapshot_epoch {
237            &self.consuming_snapshot_barrier_latency
238        } else {
239            &self.consuming_log_store_barrier_latency
240        };
241        barrier_latency_metrics.observe(barrier_latency);
242        self.pending_barriers_to_complete.push_front(epoch_state);
243    }
244}