risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::mem::take;
17
18use risingwave_common::catalog::{DatabaseId, TableId};
19use risingwave_common::util::epoch::Epoch;
20
21use crate::barrier::info::{
22    BarrierInfo, InflightDatabaseInfo, InflightStreamingJobInfo, InflightSubscriptionInfo,
23    SharedActorInfos,
24};
25use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
26
27/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
28pub(crate) struct BarrierWorkerState {
29    /// The last sent `prev_epoch`
30    ///
31    /// There's no need to persist this field. On recovery, we will restore this from the latest
32    /// committed snapshot in `HummockManager`.
33    in_flight_prev_epoch: TracedEpoch,
34
35    /// The `prev_epoch` of pending non checkpoint barriers
36    pending_non_checkpoint_barriers: Vec<u64>,
37
38    /// Inflight running actors info.
39    pub(super) inflight_graph_info: InflightDatabaseInfo,
40
41    pub(super) inflight_subscription_info: InflightSubscriptionInfo,
42
43    /// Whether the cluster is paused.
44    is_paused: bool,
45}
46
47impl BarrierWorkerState {
48    pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
49        Self {
50            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
51            pending_non_checkpoint_barriers: vec![],
52            inflight_graph_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
53            inflight_subscription_info: InflightSubscriptionInfo::default(),
54            is_paused: false,
55        }
56    }
57
58    pub fn recovery(
59        database_id: DatabaseId,
60        shared_actor_infos: SharedActorInfos,
61        in_flight_prev_epoch: TracedEpoch,
62        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
63        inflight_subscription_info: InflightSubscriptionInfo,
64        is_paused: bool,
65    ) -> Self {
66        Self {
67            in_flight_prev_epoch,
68            pending_non_checkpoint_barriers: vec![],
69            inflight_graph_info: InflightDatabaseInfo::recover(
70                database_id,
71                jobs,
72                shared_actor_infos,
73            ),
74            inflight_subscription_info,
75            is_paused,
76        }
77    }
78
79    pub fn is_paused(&self) -> bool {
80        self.is_paused
81    }
82
83    fn set_is_paused(&mut self, is_paused: bool) {
84        if self.is_paused != is_paused {
85            tracing::info!(
86                currently_paused = self.is_paused,
87                newly_paused = is_paused,
88                "update paused state"
89            );
90            self.is_paused = is_paused;
91        }
92    }
93
94    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95        &self.in_flight_prev_epoch
96    }
97
98    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
99    pub fn next_barrier_info(
100        &mut self,
101        command: Option<&Command>,
102        is_checkpoint: bool,
103        curr_epoch: TracedEpoch,
104    ) -> Option<BarrierInfo> {
105        if self.inflight_graph_info.is_empty()
106            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
107        {
108            return None;
109        };
110        assert!(
111            self.in_flight_prev_epoch.value() < curr_epoch.value(),
112            "curr epoch regress. {} > {}",
113            self.in_flight_prev_epoch.value(),
114            curr_epoch.value()
115        );
116        let prev_epoch = self.in_flight_prev_epoch.clone();
117        self.in_flight_prev_epoch = curr_epoch.clone();
118        self.pending_non_checkpoint_barriers
119            .push(prev_epoch.value().0);
120        let kind = if is_checkpoint {
121            let epochs = take(&mut self.pending_non_checkpoint_barriers);
122            BarrierKind::Checkpoint(epochs)
123        } else {
124            BarrierKind::Barrier
125        };
126        Some(BarrierInfo {
127            prev_epoch,
128            curr_epoch,
129            kind,
130        })
131    }
132
133    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
134    /// will be removed from the state after the info get resolved.
135    ///
136    /// Return (`graph_info`, `subscription_info`, `table_ids_to_commit`, `jobs_to_wait`, `prev_is_paused`)
137    pub fn apply_command(
138        &mut self,
139        command: Option<&Command>,
140    ) -> (
141        InflightDatabaseInfo,
142        InflightSubscriptionInfo,
143        HashSet<TableId>,
144        HashSet<TableId>,
145        bool,
146    ) {
147        // update the fragment_infos outside pre_apply
148        let fragment_changes = if let Some(Command::CreateStreamingJob {
149            job_type: CreateStreamingJobType::SnapshotBackfill(_),
150            ..
151        }) = command
152        {
153            None
154        } else if let Some(fragment_changes) = command.and_then(Command::fragment_changes) {
155            self.inflight_graph_info.pre_apply(&fragment_changes);
156            Some(fragment_changes)
157        } else {
158            None
159        };
160        if let Some(command) = &command {
161            self.inflight_subscription_info.pre_apply(command);
162        }
163
164        let info = self.inflight_graph_info.clone();
165        let subscription_info = self.inflight_subscription_info.clone();
166
167        if let Some(fragment_changes) = fragment_changes {
168            self.inflight_graph_info.post_apply(&fragment_changes);
169        }
170
171        let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
172        let mut jobs_to_wait = HashSet::new();
173        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
174            for (table_id, (_, graph_info)) in jobs_to_merge {
175                jobs_to_wait.insert(*table_id);
176                table_ids_to_commit.extend(graph_info.existing_table_ids());
177                self.inflight_graph_info.add_existing(graph_info.clone());
178            }
179        }
180
181        if let Some(command) = command {
182            self.inflight_subscription_info.post_apply(command);
183        }
184
185        let prev_is_paused = self.is_paused();
186        let curr_is_paused = match command {
187            Some(Command::Pause) => true,
188            Some(Command::Resume) => false,
189            _ => prev_is_paused,
190        };
191        self.set_is_paused(curr_is_paused);
192
193        (
194            info,
195            subscription_info,
196            table_ids_to_commit,
197            jobs_to_wait,
198            prev_is_paused,
199        )
200    }
201}