risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::{DatabaseId, TableId};
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::id::{ActorId, FragmentId, WorkerId};
23use risingwave_pb::stream_plan::barrier_mutation::Mutation;
24use tracing::warn;
25
26use crate::MetaResult;
27use crate::barrier::edge_builder::FragmentEdgeBuildResult;
28use crate::barrier::info::{
29    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
30    SharedActorInfos, SubscriberType,
31};
32use crate::barrier::rpc::ControlStreamManager;
33use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
34use crate::controller::fragment::InflightFragmentInfo;
35use crate::model::StreamJobActorsToCreate;
36
37/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
38pub(in crate::barrier) struct BarrierWorkerState {
39    /// The last sent `prev_epoch`
40    ///
41    /// There's no need to persist this field. On recovery, we will restore this from the latest
42    /// committed snapshot in `HummockManager`.
43    in_flight_prev_epoch: TracedEpoch,
44
45    /// The `prev_epoch` of pending non checkpoint barriers
46    pending_non_checkpoint_barriers: Vec<u64>,
47
48    /// Running info of database.
49    pub(super) database_info: InflightDatabaseInfo,
50
51    /// Whether the cluster is paused.
52    is_paused: bool,
53}
54
55impl BarrierWorkerState {
56    pub(super) fn new(database_id: DatabaseId, shared_actor_infos: SharedActorInfos) -> Self {
57        Self {
58            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
59            pending_non_checkpoint_barriers: vec![],
60            database_info: InflightDatabaseInfo::empty(database_id, shared_actor_infos),
61            is_paused: false,
62        }
63    }
64
65    pub fn recovery(
66        database_id: DatabaseId,
67        shared_actor_infos: SharedActorInfos,
68        in_flight_prev_epoch: TracedEpoch,
69        jobs: impl Iterator<Item = InflightStreamingJobInfo>,
70        is_paused: bool,
71    ) -> Self {
72        Self {
73            in_flight_prev_epoch,
74            pending_non_checkpoint_barriers: vec![],
75            database_info: InflightDatabaseInfo::recover(database_id, jobs, shared_actor_infos),
76            is_paused,
77        }
78    }
79
80    pub fn is_paused(&self) -> bool {
81        self.is_paused
82    }
83
84    fn set_is_paused(&mut self, is_paused: bool) {
85        if self.is_paused != is_paused {
86            tracing::info!(
87                currently_paused = self.is_paused,
88                newly_paused = is_paused,
89                "update paused state"
90            );
91            self.is_paused = is_paused;
92        }
93    }
94
95    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
96        &self.in_flight_prev_epoch
97    }
98
99    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
100    pub fn next_barrier_info(
101        &mut self,
102        command: Option<&Command>,
103        is_checkpoint: bool,
104        curr_epoch: TracedEpoch,
105    ) -> Option<BarrierInfo> {
106        if self.database_info.is_empty()
107            && !matches!(&command, Some(Command::CreateStreamingJob { .. }))
108        {
109            return None;
110        };
111        assert!(
112            self.in_flight_prev_epoch.value() < curr_epoch.value(),
113            "curr epoch regress. {} > {}",
114            self.in_flight_prev_epoch.value(),
115            curr_epoch.value()
116        );
117        let prev_epoch = self.in_flight_prev_epoch.clone();
118        self.in_flight_prev_epoch = curr_epoch.clone();
119        self.pending_non_checkpoint_barriers
120            .push(prev_epoch.value().0);
121        let kind = if is_checkpoint {
122            let epochs = take(&mut self.pending_non_checkpoint_barriers);
123            BarrierKind::Checkpoint(epochs)
124        } else {
125            BarrierKind::Barrier
126        };
127        Some(BarrierInfo {
128            prev_epoch,
129            curr_epoch,
130            kind,
131        })
132    }
133}
134
135pub(super) struct ApplyCommandInfo {
136    pub node_actors: HashMap<WorkerId, HashSet<ActorId>>,
137    pub actors_to_create: Option<StreamJobActorsToCreate>,
138    pub mv_subscription_max_retention: HashMap<TableId, u64>,
139    pub table_ids_to_commit: HashSet<TableId>,
140    pub table_ids_to_sync: HashSet<TableId>,
141    pub jobs_to_wait: HashSet<JobId>,
142    pub mutation: Option<Mutation>,
143}
144
145impl BarrierWorkerState {
146    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
147    /// will be removed from the state after the info get resolved.
148    pub(super) fn apply_command(
149        &mut self,
150        command: Option<&Command>,
151        finished_snapshot_backfill_job_fragments: HashMap<
152            JobId,
153            HashMap<FragmentId, InflightFragmentInfo>,
154        >,
155        edges: &mut Option<FragmentEdgeBuildResult>,
156        control_stream_manager: &ControlStreamManager,
157    ) -> MetaResult<ApplyCommandInfo> {
158        // update the fragment_infos outside pre_apply
159        let post_apply_changes = if let Some(Command::CreateStreamingJob {
160            job_type: CreateStreamingJobType::SnapshotBackfill(_),
161            ..
162        }) = command
163        {
164            None
165        } else if let Some((new_job, fragment_changes)) =
166            command.and_then(Command::fragment_changes)
167        {
168            Some(self.database_info.pre_apply(new_job, fragment_changes))
169        } else {
170            None
171        };
172
173        match &command {
174            Some(Command::CreateSubscription {
175                subscription_id,
176                upstream_mv_table_id,
177                retention_second,
178            }) => {
179                self.database_info.register_subscriber(
180                    upstream_mv_table_id.as_job_id(),
181                    subscription_id.as_subscriber_id(),
182                    SubscriberType::Subscription(*retention_second),
183                );
184            }
185            Some(Command::CreateStreamingJob {
186                info,
187                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
188                ..
189            }) => {
190                for upstream_mv_table_id in snapshot_backfill_info
191                    .upstream_mv_table_id_to_backfill_epoch
192                    .keys()
193                {
194                    self.database_info.register_subscriber(
195                        upstream_mv_table_id.as_job_id(),
196                        info.streaming_job.id().as_subscriber_id(),
197                        SubscriberType::SnapshotBackfill,
198                    );
199                }
200            }
201            _ => {}
202        };
203
204        let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
205        let actors_to_create = command.as_ref().and_then(|command| {
206            command.actors_to_create(&self.database_info, edges, control_stream_manager)
207        });
208        let node_actors =
209            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
210
211        if let Some(post_apply_changes) = post_apply_changes {
212            self.database_info.post_apply(post_apply_changes);
213        }
214
215        let mut jobs_to_wait = HashSet::new();
216        if let Some(Command::MergeSnapshotBackfillStreamingJobs(jobs_to_merge)) = command {
217            assert!(
218                jobs_to_merge
219                    .keys()
220                    .all(|job_id| finished_snapshot_backfill_job_fragments.contains_key(job_id))
221            );
222            for (job_id, job_fragments) in finished_snapshot_backfill_job_fragments {
223                assert!(jobs_to_merge.contains_key(&job_id));
224                jobs_to_wait.insert(job_id);
225                table_ids_to_commit.extend(InflightFragmentInfo::existing_table_ids(
226                    job_fragments.values(),
227                ));
228                self.database_info.add_existing(InflightStreamingJobInfo {
229                    job_id,
230                    fragment_infos: job_fragments,
231                    subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
232                    status: CreateStreamingJobStatus::Created,
233                    cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
234                });
235            }
236        } else {
237            assert!(finished_snapshot_backfill_job_fragments.is_empty());
238        }
239
240        match &command {
241            Some(Command::DropSubscription {
242                subscription_id,
243                upstream_mv_table_id,
244            }) => {
245                if self
246                    .database_info
247                    .unregister_subscriber(
248                        upstream_mv_table_id.as_job_id(),
249                        subscription_id.as_subscriber_id(),
250                    )
251                    .is_none()
252                {
253                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
254                }
255            }
256            Some(Command::MergeSnapshotBackfillStreamingJobs(snapshot_backfill_jobs)) => {
257                for (snapshot_backfill_job_id, upstream_tables) in snapshot_backfill_jobs {
258                    for upstream_mv_table_id in upstream_tables {
259                        assert_matches!(
260                            self.database_info.unregister_subscriber(
261                                upstream_mv_table_id.as_job_id(),
262                                snapshot_backfill_job_id.as_subscriber_id()
263                            ),
264                            Some(SubscriberType::SnapshotBackfill)
265                        );
266                    }
267                }
268            }
269            _ => {}
270        }
271
272        let prev_is_paused = self.is_paused();
273        let curr_is_paused = match command {
274            Some(Command::Pause) => true,
275            Some(Command::Resume) => false,
276            _ => prev_is_paused,
277        };
278        self.set_is_paused(curr_is_paused);
279
280        let table_ids_to_sync =
281            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()).collect();
282
283        let mutation = if let Some(c) = &command {
284            c.to_mutation(
285                prev_is_paused,
286                edges,
287                control_stream_manager,
288                &mut self.database_info,
289            )?
290        } else {
291            None
292        };
293
294        Ok(ApplyCommandInfo {
295            node_actors,
296            actors_to_create,
297            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
298            table_ids_to_commit,
299            table_ids_to_sync,
300            jobs_to_wait,
301            mutation,
302        })
303    }
304}