risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use tracing::warn;
23
24use crate::barrier::info::{
25    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
26    SharedActorInfos, SubscriberType,
27};
28use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
29use crate::controller::fragment::InflightFragmentInfo;
30
31/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
32pub(crate) struct BarrierWorkerState {
33    /// The last sent `prev_epoch`
34    ///
35    /// There's no need to persist this field. On recovery, we will restore this from the latest
36    /// committed snapshot in `HummockManager`.
37    in_flight_prev_epoch: TracedEpoch,
38
39    /// The `prev_epoch` of pending non checkpoint barriers
40    pending_non_checkpoint_barriers: Vec<u64>,
41
42    /// Inflight running actors info.
43    pub(super) inflight_graph_info: InflightDatabaseInfo,
44
45    /// Whether the cluster is paused.
46    is_paused: bool,
47}
48
49impl BarrierWorkerState {
50    pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
51        Self {
52            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
53            pending_non_checkpoint_barriers: vec![],
54            inflight_graph_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
55            is_paused: false,
56        }
57    }
58
59    pub fn recovery(
60        database_id: DatabaseId,
61        shared_actor_infos: SharedActorInfos,
62        in_flight_prev_epoch: TracedEpoch,
63        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
64        is_paused: bool,
65    ) -> Self {
66        Self {
67            in_flight_prev_epoch,
68            pending_non_checkpoint_barriers: vec![],
69            inflight_graph_info: InflightDatabaseInfo::recover(
70                database_id,
71                jobs,
72                shared_actor_infos,
73            ),
74            is_paused,
75        }
76    }
77
78    pub fn is_paused(&self) -> bool {
79        self.is_paused
80    }
81
82    fn set_is_paused(&mut self, is_paused: bool) {
83        if self.is_paused != is_paused {
84            tracing::info!(
85                currently_paused = self.is_paused,
86                newly_paused = is_paused,
87                "update paused state"
88            );
89            self.is_paused = is_paused;
90        }
91    }
92
93    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
94        &self.in_flight_prev_epoch
95    }
96
97    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
98    pub fn next_barrier_info(
99        &mut self,
100        command: Option<&Command>,
101        is_checkpoint: bool,
102        curr_epoch: TracedEpoch,
103    ) -> Option<BarrierInfo> {
104        if self.inflight_graph_info.is_empty()
105            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
106        {
107            return None;
108        };
109        assert!(
110            self.in_flight_prev_epoch.value() < curr_epoch.value(),
111            "curr epoch regress. {} > {}",
112            self.in_flight_prev_epoch.value(),
113            curr_epoch.value()
114        );
115        let prev_epoch = self.in_flight_prev_epoch.clone();
116        self.in_flight_prev_epoch = curr_epoch.clone();
117        self.pending_non_checkpoint_barriers
118            .push(prev_epoch.value().0);
119        let kind = if is_checkpoint {
120            let epochs = take(&mut self.pending_non_checkpoint_barriers);
121            BarrierKind::Checkpoint(epochs)
122        } else {
123            BarrierKind::Barrier
124        };
125        Some(BarrierInfo {
126            prev_epoch,
127            curr_epoch,
128            kind,
129        })
130    }
131
132    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
133    /// will be removed from the state after the info get resolved.
134    ///
135    /// Return (`graph_info`, `mv_subscription_max_retention`, `table_ids_to_commit`, `jobs_to_wait`, `prev_is_paused`)
136    pub fn apply_command(
137        &mut self,
138        command: Option<&Command>,
139    ) -> (
140        InflightDatabaseInfo,
141        HashMap<TableId, u64>,
142        HashSet<TableId>,
143        HashSet<JobId>,
144        bool,
145    ) {
146        // update the fragment_infos outside pre_apply
147        let fragment_changes = if let Some(Command::CreateStreamingJob {
148            job_type: CreateStreamingJobType::SnapshotBackfill(_),
149            ..
150        }) = command
151        {
152            None
153        } else if let Some((new_job_id, fragment_changes)) =
154            command.and_then(Command::fragment_changes)
155        {
156            self.inflight_graph_info
157                .pre_apply(new_job_id, &fragment_changes);
158            Some(fragment_changes)
159        } else {
160            None
161        };
162
163        match &command {
164            Some(Command::CreateSubscription {
165                subscription_id,
166                upstream_mv_table_id,
167                retention_second,
168            }) => {
169                self.inflight_graph_info.register_subscriber(
170                    upstream_mv_table_id.as_job_id(),
171                    subscription_id.as_raw_id(),
172                    SubscriberType::Subscription(*retention_second),
173                );
174            }
175            Some(Command::CreateStreamingJob {
176                info,
177                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
178                ..
179            }) => {
180                for upstream_mv_table_id in snapshot_backfill_info
181                    .upstream_mv_table_id_to_backfill_epoch
182                    .keys()
183                {
184                    self.inflight_graph_info.register_subscriber(
185                        upstream_mv_table_id.as_job_id(),
186                        info.streaming_job.id().as_raw_id(),
187                        SubscriberType::SnapshotBackfill,
188                    );
189                }
190            }
191            _ => {}
192        };
193
194        let info = self.inflight_graph_info.clone();
195
196        if let Some(fragment_changes) = fragment_changes {
197            self.inflight_graph_info.post_apply(&fragment_changes);
198        }
199
200        let mut table_ids_to_commit: HashSet<_> = info.existing_table_ids().collect();
201        let mut jobs_to_wait = HashSet::new();
202        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
203            for (&job_id, (_, graph_info)) in jobs_to_merge {
204                jobs_to_wait.insert(job_id);
205                table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
206                    graph_info.values(),
207                ));
208                self.inflight_graph_info.add_existing(InflightStreamingJobInfo {
209                    job_id,
210                    fragment_infos: graph_info.clone(),
211                    subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
212                    status: CreateStreamingJobStatus::Created,
213                });
214            }
215        }
216
217        match &command {
218            Some(Command::DropSubscription {
219                subscription_id,
220                upstream_mv_table_id,
221            }) => {
222                if self
223                    .inflight_graph_info
224                    .unregister_subscriber(
225                        upstream_mv_table_id.as_job_id(),
226                        subscription_id.as_raw_id(),
227                    )
228                    .is_none()
229                {
230                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
231                }
232            }
233            Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
234                for (snapshot_backfill_job_id, (upstream_mv_table_ids, _)) in snapshot_backfill_jobs
235                {
236                    for upstream_mv_table_id in upstream_mv_table_ids {
237                        assert_matches!(
238                            self.inflight_graph_info.unregister_subscriber(
239                                upstream_mv_table_id.as_job_id(),
240                                snapshot_backfill_job_id.as_raw_id()
241                            ),
242                            Some(SubscriberType::SnapshotBackfill)
243                        );
244                    }
245                }
246            }
247            _ => {}
248        }
249
250        let prev_is_paused = self.is_paused();
251        let curr_is_paused = match command {
252            Some(Command::Pause) => true,
253            Some(Command::Resume) => false,
254            _ => prev_is_paused,
255        };
256        self.set_is_paused(curr_is_paused);
257
258        (
259            info,
260            self.inflight_graph_info.max_subscription_retention(),
261            table_ids_to_commit,
262            jobs_to_wait,
263            prev_is_paused,
264        )
265    }
266}