risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::mem::take;
17
18use risingwave_common::catalog::TableId;
19use risingwave_common::util::epoch::Epoch;
20
21use crate::barrier::info::{BarrierInfo, InflightDatabaseInfo, InflightSubscriptionInfo};
22use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
23
24/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
25pub(crate) struct BarrierWorkerState {
26    /// The last sent `prev_epoch`
27    ///
28    /// There's no need to persist this field. On recovery, we will restore this from the latest
29    /// committed snapshot in `HummockManager`.
30    in_flight_prev_epoch: TracedEpoch,
31
32    /// The `prev_epoch` of pending non checkpoint barriers
33    pending_non_checkpoint_barriers: Vec<u64>,
34
35    /// Inflight running actors info.
36    pub(super) inflight_graph_info: InflightDatabaseInfo,
37
38    pub(super) inflight_subscription_info: InflightSubscriptionInfo,
39
40    /// Whether the cluster is paused.
41    is_paused: bool,
42}
43
44impl BarrierWorkerState {
45    pub(super) fn new() -> Self {
46        Self {
47            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
48            pending_non_checkpoint_barriers: vec![],
49            inflight_graph_info: InflightDatabaseInfo::empty(),
50            inflight_subscription_info: InflightSubscriptionInfo::default(),
51            is_paused: false,
52        }
53    }
54
55    pub fn recovery(
56        in_flight_prev_epoch: TracedEpoch,
57        inflight_graph_info: InflightDatabaseInfo,
58        inflight_subscription_info: InflightSubscriptionInfo,
59        is_paused: bool,
60    ) -> Self {
61        Self {
62            in_flight_prev_epoch,
63            pending_non_checkpoint_barriers: vec![],
64            inflight_graph_info,
65            inflight_subscription_info,
66            is_paused,
67        }
68    }
69
70    pub fn is_paused(&self) -> bool {
71        self.is_paused
72    }
73
74    fn set_is_paused(&mut self, is_paused: bool) {
75        if self.is_paused != is_paused {
76            tracing::info!(
77                currently_paused = self.is_paused,
78                newly_paused = is_paused,
79                "update paused state"
80            );
81            self.is_paused = is_paused;
82        }
83    }
84
85    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
86        &self.in_flight_prev_epoch
87    }
88
89    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
90    pub fn next_barrier_info(
91        &mut self,
92        command: Option<&Command>,
93        is_checkpoint: bool,
94        curr_epoch: TracedEpoch,
95    ) -> Option<BarrierInfo> {
96        if self.inflight_graph_info.is_empty()
97            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
98        {
99            return None;
100        };
101        assert!(
102            self.in_flight_prev_epoch.value() < curr_epoch.value(),
103            "curr epoch regress. {} > {}",
104            self.in_flight_prev_epoch.value(),
105            curr_epoch.value()
106        );
107        let prev_epoch = self.in_flight_prev_epoch.clone();
108        self.in_flight_prev_epoch = curr_epoch.clone();
109        self.pending_non_checkpoint_barriers
110            .push(prev_epoch.value().0);
111        let kind = if is_checkpoint {
112            let epochs = take(&mut self.pending_non_checkpoint_barriers);
113            BarrierKind::Checkpoint(epochs)
114        } else {
115            BarrierKind::Barrier
116        };
117        Some(BarrierInfo {
118            prev_epoch,
119            curr_epoch,
120            kind,
121        })
122    }
123
124    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
125    /// will be removed from the state after the info get resolved.
126    ///
127    /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`, `prev_is_paused`)
128    pub fn apply_command(
129        &mut self,
130        command: Option<&Command>,
131    ) -> (
132        InflightDatabaseInfo,
133        InflightSubscriptionInfo,
134        HashSet<TableId>,
135        HashSet<TableId>,
136        bool,
137    ) {
138        // update the fragment_infos outside pre_apply
139        let fragment_changes = if let Some(Command::CreateStreamingJob {
140            job_type: CreateStreamingJobType::SnapshotBackfill(_),
141            ..
142        }) = command
143        {
144            None
145        } else if let Some(fragment_changes) = command.and_then(Command::fragment_changes) {
146            self.inflight_graph_info.pre_apply(&fragment_changes);
147            Some(fragment_changes)
148        } else {
149            None
150        };
151        if let Some(command) = &command {
152            self.inflight_subscription_info.pre_apply(command);
153        }
154
155        let info = self.inflight_graph_info.clone();
156        let subscription_info = self.inflight_subscription_info.clone();
157
158        if let Some(fragment_changes) = fragment_changes {
159            self.inflight_graph_info.post_apply(&fragment_changes);
160        }
161
162        let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
163        let mut jobs_to_wait = HashSet::new();
164        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
165            for (table_id, (_, graph_info)) in jobs_to_merge {
166                jobs_to_wait.insert(*table_id);
167                table_ids_to_commit.extend(graph_info.existing_table_ids());
168                self.inflight_graph_info.extend(graph_info.clone());
169            }
170        }
171
172        if let Some(command) = command {
173            self.inflight_subscription_info.post_apply(command);
174        }
175
176        let prev_is_paused = self.is_paused();
177        let curr_is_paused = match command {
178            Some(Command::Pause) => true,
179            Some(Command::Resume) => false,
180            _ => prev_is_paused,
181        };
182        self.set_is_paused(curr_is_paused);
183
184        (
185            info,
186            subscription_info,
187            table_ids_to_commit,
188            jobs_to_wait,
189            prev_is_paused,
190        )
191    }
192}