risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::{HashMap, HashSet};
17use std::mem::take;
18
19use risingwave_common::catalog::TableId;
20use risingwave_common::id::JobId;
21use risingwave_common::util::epoch::Epoch;
22use risingwave_pb::hummock::HummockVersionStats;
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use risingwave_pb::stream_plan::barrier_mutation::PbMutation;
25use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
26use risingwave_pb::stream_plan::{
27    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
28};
29use tracing::warn;
30
31use crate::MetaResult;
32use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
33use crate::barrier::edge_builder::FragmentEdgeBuilder;
34use crate::barrier::info::{
35    BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
36};
37use crate::barrier::rpc::ControlStreamManager;
38use crate::barrier::utils::NodeToCollect;
39use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
40use crate::controller::fragment::InflightFragmentInfo;
41use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
42
43/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
44pub(in crate::barrier) struct BarrierWorkerState {
45    /// The last sent `prev_epoch`
46    ///
47    /// There's no need to persist this field. On recovery, we will restore this from the latest
48    /// committed snapshot in `HummockManager`.
49    in_flight_prev_epoch: TracedEpoch,
50
51    /// The `prev_epoch` of pending non checkpoint barriers
52    pending_non_checkpoint_barriers: Vec<u64>,
53
54    /// Whether the cluster is paused.
55    is_paused: bool,
56}
57
58impl BarrierWorkerState {
59    pub(super) fn new() -> Self {
60        Self {
61            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
62            pending_non_checkpoint_barriers: vec![],
63            is_paused: false,
64        }
65    }
66
67    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
68        Self {
69            in_flight_prev_epoch,
70            pending_non_checkpoint_barriers: vec![],
71            is_paused,
72        }
73    }
74
75    pub fn is_paused(&self) -> bool {
76        self.is_paused
77    }
78
79    fn set_is_paused(&mut self, is_paused: bool) {
80        if self.is_paused != is_paused {
81            tracing::info!(
82                currently_paused = self.is_paused,
83                newly_paused = is_paused,
84                "update paused state"
85            );
86            self.is_paused = is_paused;
87        }
88    }
89
90    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
91        &self.in_flight_prev_epoch
92    }
93
94    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
95    pub fn next_barrier_info(
96        &mut self,
97        is_checkpoint: bool,
98        curr_epoch: TracedEpoch,
99    ) -> BarrierInfo {
100        assert!(
101            self.in_flight_prev_epoch.value() < curr_epoch.value(),
102            "curr epoch regress. {} > {}",
103            self.in_flight_prev_epoch.value(),
104            curr_epoch.value()
105        );
106        let prev_epoch = self.in_flight_prev_epoch.clone();
107        self.in_flight_prev_epoch = curr_epoch.clone();
108        self.pending_non_checkpoint_barriers
109            .push(prev_epoch.value().0);
110        let kind = if is_checkpoint {
111            let epochs = take(&mut self.pending_non_checkpoint_barriers);
112            BarrierKind::Checkpoint(epochs)
113        } else {
114            BarrierKind::Barrier
115        };
116        BarrierInfo {
117            prev_epoch,
118            curr_epoch,
119            kind,
120        }
121    }
122}
123
124pub(super) struct ApplyCommandInfo {
125    pub mv_subscription_max_retention: HashMap<TableId, u64>,
126    pub table_ids_to_commit: HashSet<TableId>,
127    pub jobs_to_wait: HashSet<JobId>,
128    pub node_to_collect: NodeToCollect,
129    pub command: Option<Command>,
130}
131
132impl DatabaseCheckpointControl {
133    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
134    /// will be removed from the state after the info get resolved.
135    pub(super) fn apply_command(
136        &mut self,
137        mut command: Option<Command>,
138        barrier_info: &BarrierInfo,
139        control_stream_manager: &mut ControlStreamManager,
140        hummock_version_stats: &HummockVersionStats,
141    ) -> MetaResult<ApplyCommandInfo> {
142        let mut edges = self
143            .database_info
144            .build_edge(command.as_ref(), &*control_stream_manager);
145
146        // Insert newly added snapshot backfill job
147        if let &mut Some(Command::CreateStreamingJob {
148            ref mut job_type,
149            ref mut info,
150            ref cross_db_snapshot_backfill_info,
151        }) = &mut command
152        {
153            match job_type {
154                CreateStreamingJobType::Normal | CreateStreamingJobType::SinkIntoTable(_) => {
155                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
156                        fill_snapshot_backfill_epoch(
157                            &mut fragment.nodes,
158                            None,
159                            cross_db_snapshot_backfill_info,
160                        )?;
161                    }
162                }
163                CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
164                    assert!(!self.state.is_paused());
165                    let snapshot_epoch = barrier_info.prev_epoch();
166                    // set snapshot epoch of upstream table for snapshot backfill
167                    for snapshot_backfill_epoch in snapshot_backfill_info
168                        .upstream_mv_table_id_to_backfill_epoch
169                        .values_mut()
170                    {
171                        assert_eq!(
172                            snapshot_backfill_epoch.replace(snapshot_epoch),
173                            None,
174                            "must not set previously"
175                        );
176                    }
177                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
178                        fill_snapshot_backfill_epoch(
179                            &mut fragment.nodes,
180                            Some(snapshot_backfill_info),
181                            cross_db_snapshot_backfill_info,
182                        )?;
183                    }
184                    let job_id = info.stream_job_fragments.stream_job_id();
185                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
186                        .upstream_mv_table_id_to_backfill_epoch
187                        .keys()
188                        .cloned()
189                        .collect();
190
191                    let job = CreatingStreamingJobControl::new(
192                        info,
193                        snapshot_backfill_upstream_tables,
194                        snapshot_epoch,
195                        hummock_version_stats,
196                        control_stream_manager,
197                        edges.as_mut().expect("should exist"),
198                    )?;
199
200                    self.database_info
201                        .shared_actor_infos
202                        .upsert(self.database_id, job.fragment_infos_with_job_id());
203
204                    self.creating_streaming_job_controls.insert(job_id, job);
205                }
206            }
207        }
208
209        // update the fragment_infos outside pre_apply
210        let post_apply_changes = if let Some(Command::CreateStreamingJob {
211            job_type: CreateStreamingJobType::SnapshotBackfill(_),
212            ..
213        }) = command
214        {
215            None
216        } else if let Some((new_job, fragment_changes)) =
217            command.as_ref().and_then(Command::fragment_changes)
218        {
219            Some(self.database_info.pre_apply(new_job, fragment_changes))
220        } else {
221            None
222        };
223
224        match &command {
225            Some(Command::CreateSubscription {
226                subscription_id,
227                upstream_mv_table_id,
228                retention_second,
229            }) => {
230                self.database_info.register_subscriber(
231                    upstream_mv_table_id.as_job_id(),
232                    subscription_id.as_subscriber_id(),
233                    SubscriberType::Subscription(*retention_second),
234                );
235            }
236            Some(Command::CreateStreamingJob {
237                info,
238                job_type: CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
239                ..
240            }) => {
241                for upstream_mv_table_id in snapshot_backfill_info
242                    .upstream_mv_table_id_to_backfill_epoch
243                    .keys()
244                {
245                    self.database_info.register_subscriber(
246                        upstream_mv_table_id.as_job_id(),
247                        info.streaming_job.id().as_subscriber_id(),
248                        SubscriberType::SnapshotBackfill,
249                    );
250                }
251            }
252            _ => {}
253        };
254
255        let mut table_ids_to_commit: HashSet<_> = self.database_info.existing_table_ids().collect();
256        let mut actors_to_create = command.as_ref().and_then(|command| {
257            command.actors_to_create(&self.database_info, &mut edges, control_stream_manager)
258        });
259        let mut node_actors =
260            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
261
262        if let Some(post_apply_changes) = post_apply_changes {
263            self.database_info.post_apply(post_apply_changes);
264        }
265
266        let prev_is_paused = self.state.is_paused();
267        let curr_is_paused = match command {
268            Some(Command::Pause) => true,
269            Some(Command::Resume) => false,
270            _ => prev_is_paused,
271        };
272        self.state.set_is_paused(curr_is_paused);
273
274        let mutation = if let Some(c) = &command {
275            c.to_mutation(
276                prev_is_paused,
277                &mut edges,
278                control_stream_manager,
279                &mut self.database_info,
280            )?
281        } else {
282            None
283        };
284
285        let mut finished_snapshot_backfill_jobs = HashSet::new();
286        let mutation = match mutation {
287            Some(mutation) => Some(mutation),
288            None => {
289                let mut finished_snapshot_backfill_job_info = HashMap::new();
290                if barrier_info.kind.is_checkpoint() {
291                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
292                        if creating_job.should_merge_to_upstream() {
293                            let info = creating_job
294                                .start_consume_upstream(control_stream_manager, barrier_info)?;
295                            finished_snapshot_backfill_job_info
296                                .try_insert(job_id, info)
297                                .expect("non-duplicated");
298                        }
299                    }
300                }
301
302                if !finished_snapshot_backfill_job_info.is_empty() {
303                    let actors_to_create = actors_to_create.get_or_insert_default();
304                    let mut subscriptions_to_drop = vec![];
305                    let mut dispatcher_update = vec![];
306                    let mut actor_splits = HashMap::new();
307                    for (job_id, info) in finished_snapshot_backfill_job_info {
308                        finished_snapshot_backfill_jobs.insert(job_id);
309                        subscriptions_to_drop.extend(
310                            info.snapshot_backfill_upstream_tables.iter().map(
311                                |upstream_table_id| PbSubscriptionUpstreamInfo {
312                                    subscriber_id: job_id.as_subscriber_id(),
313                                    upstream_mv_table_id: *upstream_table_id,
314                                },
315                            ),
316                        );
317                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
318                            assert_matches!(
319                                self.database_info.unregister_subscriber(
320                                    upstream_mv_table_id.as_job_id(),
321                                    job_id.as_subscriber_id()
322                                ),
323                                Some(SubscriberType::SnapshotBackfill)
324                            );
325                        }
326
327                        table_ids_to_commit.extend(
328                            info.fragment_infos
329                                .values()
330                                .flat_map(|fragment| fragment.state_table_ids.iter())
331                                .copied(),
332                        );
333
334                        let actor_len = info
335                            .fragment_infos
336                            .values()
337                            .map(|fragment| fragment.actors.len() as u64)
338                            .sum();
339                        let id_gen = GlobalActorIdGen::new(
340                            control_stream_manager.env.actor_id_generator(),
341                            actor_len,
342                        );
343                        let mut next_local_actor_id = 0;
344                        // mapping from old_actor_id to new_actor_id
345                        let actor_mapping: HashMap<_, _> = info
346                            .fragment_infos
347                            .values()
348                            .flat_map(|fragment| fragment.actors.keys())
349                            .map(|old_actor_id| {
350                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
351                                next_local_actor_id += 1;
352                                (*old_actor_id, new_actor_id.as_global_id())
353                            })
354                            .collect();
355                        let actor_mapping = &actor_mapping;
356                        let new_stream_actors: HashMap<_, _> = info
357                            .stream_actors
358                            .into_iter()
359                            .map(|(old_actor_id, mut actor)| {
360                                let new_actor_id = actor_mapping[&old_actor_id];
361                                actor.actor_id = new_actor_id;
362                                (new_actor_id, actor)
363                            })
364                            .collect();
365                        let new_fragment_info: HashMap<_, _> = info
366                            .fragment_infos
367                            .into_iter()
368                            .map(|(fragment_id, mut fragment)| {
369                                let actors = take(&mut fragment.actors);
370                                fragment.actors = actors
371                                    .into_iter()
372                                    .map(|(old_actor_id, actor)| {
373                                        let new_actor_id = actor_mapping[&old_actor_id];
374                                        (new_actor_id, actor)
375                                    })
376                                    .collect();
377                                (fragment_id, fragment)
378                            })
379                            .collect();
380                        actor_splits.extend(
381                            new_fragment_info
382                                .values()
383                                .flat_map(|fragment| &fragment.actors)
384                                .map(|(actor_id, actor)| {
385                                    (
386                                        *actor_id,
387                                        ConnectorSplits {
388                                            splits: actor
389                                                .splits
390                                                .iter()
391                                                .map(ConnectorSplit::from)
392                                                .collect(),
393                                        },
394                                    )
395                                }),
396                        );
397                        let mut edge_builder = FragmentEdgeBuilder::new(
398                            info.upstream_fragment_downstreams
399                                .keys()
400                                .map(|upstream_fragment_id| {
401                                    self.database_info.fragment(*upstream_fragment_id)
402                                })
403                                .chain(new_fragment_info.values()),
404                            control_stream_manager,
405                        );
406                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
407                        edge_builder.add_relations(&info.downstreams);
408                        let mut edges = edge_builder.build();
409                        let new_actors_to_create = edges.collect_actors_to_create(
410                            new_fragment_info.values().map(|fragment| {
411                                (
412                                    fragment.fragment_id,
413                                    &fragment.nodes,
414                                    fragment.actors.iter().map(|(actor_id, actor)| {
415                                        (&new_stream_actors[actor_id], actor.worker_id)
416                                    }),
417                                    [], // no initial subscriber for backfilling job
418                                )
419                            }),
420                        );
421                        dispatcher_update.extend(
422                            info.upstream_fragment_downstreams.keys().flat_map(
423                                |upstream_fragment_id| {
424                                    let new_actor_dispatchers = edges
425                                        .dispatchers
426                                        .remove(upstream_fragment_id)
427                                        .expect("should exist");
428                                    new_actor_dispatchers.into_iter().flat_map(
429                                        |(upstream_actor_id, dispatchers)| {
430                                            dispatchers.into_iter().map(move |dispatcher| {
431                                                PbDispatcherUpdate {
432                                                    actor_id: upstream_actor_id,
433                                                    dispatcher_id: dispatcher.dispatcher_id,
434                                                    hash_mapping: dispatcher.hash_mapping,
435                                                    removed_downstream_actor_id: dispatcher
436                                                        .downstream_actor_id
437                                                        .iter()
438                                                        .map(|new_downstream_actor_id| {
439                                                            actor_mapping
440                                                            .iter()
441                                                            .find_map(
442                                                                |(old_actor_id, new_actor_id)| {
443                                                                    (new_downstream_actor_id
444                                                                        == new_actor_id)
445                                                                        .then_some(*old_actor_id)
446                                                                },
447                                                            )
448                                                            .expect("should exist")
449                                                        })
450                                                        .collect(),
451                                                    added_downstream_actor_id: dispatcher
452                                                        .downstream_actor_id,
453                                                }
454                                            })
455                                        },
456                                    )
457                                },
458                            ),
459                        );
460                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
461                        for (worker_id, worker_actors) in new_actors_to_create {
462                            node_actors.entry(worker_id).or_default().extend(
463                                worker_actors.values().flat_map(|(_, actors, _)| {
464                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
465                                }),
466                            );
467                            actors_to_create
468                                .entry(worker_id)
469                                .or_default()
470                                .extend(worker_actors);
471                        }
472                        self.database_info.add_existing(InflightStreamingJobInfo {
473                            job_id,
474                            fragment_infos: new_fragment_info,
475                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
476                            status: CreateStreamingJobStatus::Created,
477                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
478                        });
479                    }
480
481                    Some(PbMutation::Update(PbUpdateMutation {
482                        dispatcher_update,
483                        merge_update: vec![], // no upstream update on existing actors
484                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
485                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
486                        actor_splits,
487                        actor_new_dispatchers: Default::default(), // no new dispatcher
488                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
489                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
490                        subscriptions_to_drop,
491                    }))
492                } else {
493                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
494                    if fragment_ids.is_empty() {
495                        None
496                    } else {
497                        Some(PbMutation::StartFragmentBackfill(
498                            PbStartFragmentBackfillMutation { fragment_ids },
499                        ))
500                    }
501                }
502            }
503        };
504
505        #[expect(clippy::collapsible_if)]
506        if let Some(Command::DropSubscription {
507            subscription_id,
508            upstream_mv_table_id,
509        }) = command
510        {
511            if self
512                .database_info
513                .unregister_subscriber(
514                    upstream_mv_table_id.as_job_id(),
515                    subscription_id.as_subscriber_id(),
516                )
517                .is_none()
518            {
519                warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
520            }
521        }
522
523        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
524            if !finished_snapshot_backfill_jobs.contains(job_id) {
525                creating_job.on_new_upstream_barrier(control_stream_manager, barrier_info)?;
526            }
527        }
528
529        let node_to_collect = control_stream_manager.inject_barrier(
530            self.database_id,
531            None,
532            mutation,
533            barrier_info,
534            &node_actors,
535            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
536            actors_to_create,
537        )?;
538
539        Ok(ApplyCommandInfo {
540            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
541            table_ids_to_commit,
542            jobs_to_wait: finished_snapshot_backfill_jobs,
543            node_to_collect,
544            command,
545        })
546    }
547}