risingwave_meta/barrier/checkpoint/creating_job/
barrier_control.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::VecDeque;
16use std::ops::Bound::Unbounded;
17use std::ops::{Bound, RangeBounds};
18
19use risingwave_common::id::JobId;
20use risingwave_common::metrics::{LabelGuardedHistogram, LabelGuardedIntGauge};
21use risingwave_common::util::epoch::EpochPair;
22use risingwave_pb::stream_service::BarrierCompleteResponse;
23use tracing::debug;
24
25use crate::barrier::notifier::Notifier;
26use crate::barrier::partial_graph::{CollectedBarrier, PartialGraphBarrierInfo, PartialGraphStat};
27use crate::rpc::metrics::GLOBAL_META_METRICS;
28
29pub(super) struct CreatingStreamingJobBarrierStats {
30    consuming_snapshot_barrier_latency: LabelGuardedHistogram,
31    consuming_log_store_barrier_latency: LabelGuardedHistogram,
32    inflight_barrier_num: LabelGuardedIntGauge,
33
34    snapshot_epoch: u64,
35}
36
37impl CreatingStreamingJobBarrierStats {
38    pub(super) fn new(job_id: JobId, snapshot_epoch: u64) -> Self {
39        let table_id_str = format!("{}", job_id);
40        Self {
41            snapshot_epoch,
42            consuming_snapshot_barrier_latency: GLOBAL_META_METRICS
43                .snapshot_backfill_barrier_latency
44                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_snapshot"]),
45            consuming_log_store_barrier_latency: GLOBAL_META_METRICS
46                .snapshot_backfill_barrier_latency
47                .with_guarded_label_values(&[table_id_str.as_str(), "consuming_log_store"]),
48            inflight_barrier_num: GLOBAL_META_METRICS
49                .snapshot_backfill_inflight_barrier_num
50                .with_guarded_label_values(&[&table_id_str]),
51        }
52    }
53}
54
55impl PartialGraphStat for CreatingStreamingJobBarrierStats {
56    fn observe_barrier_latency(&self, epoch: EpochPair, barrier_latency_secs: f64) {
57        let barrier_latency_metrics = if epoch.prev < self.snapshot_epoch {
58            &self.consuming_snapshot_barrier_latency
59        } else {
60            &self.consuming_log_store_barrier_latency
61        };
62        barrier_latency_metrics.observe(barrier_latency_secs);
63    }
64
65    fn observe_barrier_num(&self, inflight_barrier_num: usize, _collected_barrier_num: usize) {
66        self.inflight_barrier_num.set(inflight_barrier_num as _);
67    }
68}
69
70#[derive(Debug)]
71pub(super) struct CreatingStreamingJobBarrierControl {
72    job_id: JobId,
73    // newer epoch at the front. `push_front` and `pop_back`
74    inflight_barrier_queue: VecDeque<u64>,
75    max_collected_epoch: Option<u64>,
76    max_committed_epoch: Option<u64>,
77    // newer epoch at the front. `push_front` and `pop_back`
78    pending_barriers_to_complete: VecDeque<u64>,
79    completing_barrier: Option<u64>,
80}
81
82impl CreatingStreamingJobBarrierControl {
83    pub(super) fn new(job_id: JobId, committed_epoch: Option<u64>) -> Self {
84        Self {
85            job_id,
86            inflight_barrier_queue: Default::default(),
87            max_collected_epoch: committed_epoch,
88            max_committed_epoch: committed_epoch,
89            pending_barriers_to_complete: Default::default(),
90            completing_barrier: None,
91        }
92    }
93
94    pub(super) fn inflight_barrier_count(&self) -> usize {
95        self.inflight_barrier_queue.len()
96    }
97
98    fn latest_epoch(&self) -> Option<u64> {
99        self.inflight_barrier_queue
100            .front()
101            .copied()
102            .or(self.max_collected_epoch)
103    }
104
105    pub(super) fn max_committed_epoch(&self) -> Option<u64> {
106        self.max_committed_epoch
107    }
108
109    pub(super) fn is_empty(&self) -> bool {
110        self.inflight_barrier_queue.is_empty()
111            && self.pending_barriers_to_complete.is_empty()
112            && self.completing_barrier.is_none()
113    }
114
115    pub(super) fn enqueue_epoch(&mut self, epoch: u64) {
116        debug!(
117            epoch,
118            job_id = %self.job_id,
119            "creating job enqueue epoch"
120        );
121        if let Some(latest_epoch) = self.latest_epoch() {
122            assert!(epoch > latest_epoch, "{} {}", epoch, latest_epoch);
123        }
124
125        self.inflight_barrier_queue.push_front(epoch);
126    }
127
128    pub(super) fn collect(&mut self, collected_barrier: CollectedBarrier<'_>) {
129        let epoch = self
130            .inflight_barrier_queue
131            .pop_back()
132            .expect("non-empty when collected");
133        assert_eq!(epoch, collected_barrier.epoch.prev);
134        self.add_collected(collected_barrier);
135    }
136
137    /// Return Some((epoch, resps, `is_first_commit`))
138    ///
139    /// Only epoch within the `epoch_end_bound` can be started.
140    /// Usually `epoch_end_bound` is the upstream committed epoch. This is to ensure that
141    /// the creating job won't have higher committed epoch than the upstream.
142    pub(super) fn start_completing(
143        &mut self,
144        epoch_end_bound: Bound<u64>,
145        mut take_resps: impl FnMut(u64) -> (Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo),
146    ) -> Option<(u64, Vec<BarrierCompleteResponse>, PartialGraphBarrierInfo)> {
147        assert!(self.completing_barrier.is_none());
148        let epoch_range: (Bound<u64>, Bound<u64>) = (Unbounded, epoch_end_bound);
149        while let Some(&epoch) = self.pending_barriers_to_complete.back()
150            && epoch_range.contains(&epoch)
151        {
152            let epoch_state = self
153                .pending_barriers_to_complete
154                .pop_back()
155                .expect("non-empty");
156            let (resps, info) = take_resps(epoch);
157            if info.post_collect_command.should_checkpoint() {
158                assert!(info.barrier_info.kind.is_checkpoint());
159            } else if !info.barrier_info.kind.is_checkpoint() {
160                info.notifiers
161                    .into_iter()
162                    .for_each(Notifier::notify_collected);
163                continue;
164            }
165            self.completing_barrier = Some(epoch_state);
166            return Some((epoch, resps, info));
167        }
168        None
169    }
170
171    /// Ack on completing a checkpoint barrier.
172    ///
173    /// Return the upstream epoch to be notified when there is any.
174    pub(super) fn ack_completed(&mut self, completed_epoch: u64) {
175        let epoch = self.completing_barrier.take().expect("should exist");
176        assert_eq!(epoch, completed_epoch);
177        if let Some(prev_max_committed_epoch) = self.max_committed_epoch.replace(completed_epoch) {
178            assert!(completed_epoch > prev_max_committed_epoch);
179        }
180    }
181
182    fn add_collected(&mut self, collected_barrier: CollectedBarrier<'_>) {
183        let epoch = collected_barrier.epoch.prev;
184        if let Some(prev_epoch) = self.pending_barriers_to_complete.front() {
185            assert!(*prev_epoch < epoch);
186        }
187        if let Some(max_collected_epoch) = self.max_collected_epoch {
188            assert!(epoch > max_collected_epoch);
189        }
190        self.max_collected_epoch = Some(epoch);
191        self.pending_barriers_to_complete.push_front(epoch);
192    }
193}