risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::id::{ActorId, FragmentId, WorkerId};
23use risingwave_pb::stream_plan::barrier_mutation::Mutation;
24use tracing::warn;
25
26use crate::barrier::edge_builder::FragmentEdgeBuildResult;
27use crate::barrier::info::{
28    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
29    SharedActorInfos, SubscriberType,
30};
31use crate::barrier::rpc::ControlStreamManager;
32use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
33use crate::controller::fragment::InflightFragmentInfo;
34use crate::model::StreamJobActorsToCreate;
35
36/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
37pub(in crate::barrier) struct BarrierWorkerState {
38    /// The last sent `prev_epoch`
39    ///
40    /// There's no need to persist this field. On recovery, we will restore this from the latest
41    /// committed snapshot in `HummockManager`.
42    in_flight_prev_epoch: TracedEpoch,
43
44    /// The `prev_epoch` of pending non checkpoint barriers
45    pending_non_checkpoint_barriers: Vec<u64>,
46
47    /// Running info of database.
48    pub(super) database_info: InflightDatabaseInfo,
49
50    /// Whether the cluster is paused.
51    is_paused: bool,
52}
53
54impl BarrierWorkerState {
55    pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
56        Self {
57            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
58            pending_non_checkpoint_barriers: vec![],
59            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
60            is_paused: false,
61        }
62    }
63
64    pub fn recovery(
65        database_id: DatabaseId,
66        shared_actor_infos: SharedActorInfos,
67        in_flight_prev_epoch: TracedEpoch,
68        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
69        is_paused: bool,
70    ) -> Self {
71        Self {
72            in_flight_prev_epoch,
73            pending_non_checkpoint_barriers: vec![],
74            database_info: InflightDatabaseInfo::recover(database_id, jobs, shared_actor_infos),
75            is_paused,
76        }
77    }
78
79    pub fn is_paused(&self) -> bool {
80        self.is_paused
81    }
82
83    fn set_is_paused(&mut self, is_paused: bool) {
84        if self.is_paused != is_paused {
85            tracing::info!(
86                currently_paused = self.is_paused,
87                newly_paused = is_paused,
88                "update paused state"
89            );
90            self.is_paused = is_paused;
91        }
92    }
93
94    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
95        &self.in_flight_prev_epoch
96    }
97
98    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
99    pub fn next_barrier_info(
100        &mut self,
101        command: Option<&Command>,
102        is_checkpoint: bool,
103        curr_epoch: TracedEpoch,
104    ) -> Option<BarrierInfo> {
105        if self.database_info.is_empty()
106            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
107        {
108            return None;
109        };
110        assert!(
111            self.in_flight_prev_epoch.value() < curr_epoch.value(),
112            "curr epoch regress. {} > {}",
113            self.in_flight_prev_epoch.value(),
114            curr_epoch.value()
115        );
116        let prev_epoch = self.in_flight_prev_epoch.clone();
117        self.in_flight_prev_epoch = curr_epoch.clone();
118        self.pending_non_checkpoint_barriers
119            .push(prev_epoch.value().0);
120        let kind = if is_checkpoint {
121            let epochs = take(&mut self.pending_non_checkpoint_barriers);
122            BarrierKind::Checkpoint(epochs)
123        } else {
124            BarrierKind::Barrier
125        };
126        Some(BarrierInfo {
127            prev_epoch,
128            curr_epoch,
129            kind,
130        })
131    }
132}
133
134pub(super) struct ApplyCommandInfo {
135    pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
136    pub actors_to_create: Option<StreamJobActorsToCreate>,
137    pub mv_subscription_max_retention: HashMap<TableId, u64>,
138    pub table_ids_to_commit: HashSet<TableId>,
139    pub table_ids_to_sync: HashSet<TableId>,
140    pub jobs_to_wait: HashSet<JobId>,
141    pub mutation: Option<Mutation>,
142}
143
144impl BarrierWorkerState {
145    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
146    /// will be removed from the state after the info get resolved.
147    pub(super) fn apply_command(
148        &mut self,
149        command: Option<&Command>,
150        finished_snapshot_backfill_job_fragments: HashMap<
151            JobId,
152            HashMap<FragmentId, InflightFragmentInfo>,
153        >,
154        edges: &mut Option<FragmentEdgeBuildResult>,
155        control_stream_manager: &ControlStreamManager,
156    ) -> ApplyCommandInfo {
157        // update the fragment_infos outside pre_apply
158        let post_apply_changes = if let Some(Command::CreateStreamingJob {
159            job_type: CreateStreamingJobType::SnapshotBackfill(_),
160            ..
161        }) = command
162        {
163            None
164        } else if let Some((new_job_id, fragment_changes)) =
165            command.and_then(Command::fragment_changes)
166        {
167            Some(self.database_info.pre_apply(new_job_id, fragment_changes))
168        } else {
169            None
170        };
171
172        match &command {
173            Some(Command::CreateSubscription {
174                subscription_id,
175                upstream_mv_table_id,
176                retention_second,
177            }) => {
178                self.database_info.register_subscriber(
179                    upstream_mv_table_id.as_job_id(),
180                    subscription_id.as_subscriber_id(),
181                    SubscriberType::Subscription(*retention_second),
182                );
183            }
184            Some(Command::CreateStreamingJob {
185                info,
186                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
187                ..
188            }) => {
189                for upstream_mv_table_id in snapshot_backfill_info
190                    .upstream_mv_table_id_to_backfill_epoch
191                    .keys()
192                {
193                    self.database_info.register_subscriber(
194                        upstream_mv_table_id.as_job_id(),
195                        info.streaming_job.id().as_subscriber_id(),
196                        SubscriberType::SnapshotBackfill,
197                    );
198                }
199            }
200            _ => {}
201        };
202
203        let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
204        let actors_to_create = command.as_ref().and_then(|command| {
205            command.actors_to_create(&self.database_info, edges, control_stream_manager)
206        });
207        let node_actors =
208            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
209
210        if let Some(post_apply_changes) = post_apply_changes {
211            self.database_info.post_apply(post_apply_changes);
212        }
213
214        let mut jobs_to_wait = HashSet::new();
215        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
216            assert!(
217                jobs_to_merge
218                    .keys()
219                    .all(|job_id| finished_snapshot_backfill_job_fragments.contains_key(job_id))
220            );
221            for (job_id, job_fragments) in finished_snapshot_backfill_job_fragments {
222                assert!(jobs_to_merge.contains_key(&job_id));
223                jobs_to_wait.insert(job_id);
224                table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
225                    job_fragments.values(),
226                ));
227                self.database_info.add_existing(InflightStreamingJobInfo {
228                    job_id,
229                    fragment_infos: job_fragments,
230                    subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
231                    status: CreateStreamingJobStatus::Created,
232                });
233            }
234        } else {
235            assert!(finished_snapshot_backfill_job_fragments.is_empty());
236        }
237
238        match &command {
239            Some(Command::DropSubscription {
240                subscription_id,
241                upstream_mv_table_id,
242            }) => {
243                if self
244                    .database_info
245                    .unregister_subscriber(
246                        upstream_mv_table_id.as_job_id(),
247                        subscription_id.as_subscriber_id(),
248                    )
249                    .is_none()
250                {
251                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
252                }
253            }
254            Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
255                for (snapshot_backfill_job_id, upstream_tables) in snapshot_backfill_jobs {
256                    for upstream_mv_table_id in upstream_tables {
257                        assert_matches!(
258                            self.database_info.unregister_subscriber(
259                                upstream_mv_table_id.as_job_id(),
260                                snapshot_backfill_job_id.as_subscriber_id()
261                            ),
262                            Some(SubscriberType::SnapshotBackfill)
263                        );
264                    }
265                }
266            }
267            _ => {}
268        }
269
270        let prev_is_paused = self.is_paused();
271        let curr_is_paused = match command {
272            Some(Command::Pause) => true,
273            Some(Command::Resume) => false,
274            _ => prev_is_paused,
275        };
276        self.set_is_paused(curr_is_paused);
277
278        let table_ids_to_sync =
279            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()).collect();
280
281        let mutation = command
282            .as_ref()
283            .and_then(|c| c.to_mutation(prev_is_paused, edges, control_stream_manager));
284
285        ApplyCommandInfo {
286            node_actors,
287            actors_to_create,
288            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
289            table_ids_to_commit,
290            table_ids_to_sync,
291            jobs_to_wait,
292            mutation,
293        }
294    }
295}