risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19
20use risingwave_common::bail;
21use risingwave_common::catalog::TableId;
22use risingwave_common::id::JobId;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
27use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
28use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{
31    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
32    PbUpstreamSinkInfo, ThrottleMutation,
33};
34use tracing::warn;
35
36use crate::MetaResult;
37use crate::barrier::cdc_progress::CdcTableBackfillTracker;
38use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
39use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
40use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
41use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
42use crate::barrier::info::{
43    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
44    SubscriberType,
45};
46use crate::barrier::notifier::Notifier;
47use crate::barrier::partial_graph::PartialGraphManager;
48use crate::barrier::rpc::to_partial_graph_id;
49use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
50use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
51use crate::model::{ActorId, ActorNewNoShuffle, FragmentId, StreamJobActorsToCreate};
52use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
53use crate::stream::{
54    GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
55    fill_snapshot_backfill_epoch,
56};
57
58/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
59pub(in crate::barrier) struct BarrierWorkerState {
60    /// The last sent `prev_epoch`
61    ///
62    /// There's no need to persist this field. On recovery, we will restore this from the latest
63    /// committed snapshot in `HummockManager`.
64    in_flight_prev_epoch: TracedEpoch,
65
66    /// The `prev_epoch` of pending non checkpoint barriers
67    pending_non_checkpoint_barriers: Vec<u64>,
68
69    /// Whether the cluster is paused.
70    is_paused: bool,
71}
72
73impl BarrierWorkerState {
74    pub(super) fn new() -> Self {
75        Self {
76            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
77            pending_non_checkpoint_barriers: vec![],
78            is_paused: false,
79        }
80    }
81
82    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
83        Self {
84            in_flight_prev_epoch,
85            pending_non_checkpoint_barriers: vec![],
86            is_paused,
87        }
88    }
89
90    pub fn is_paused(&self) -> bool {
91        self.is_paused
92    }
93
94    fn set_is_paused(&mut self, is_paused: bool) {
95        if self.is_paused != is_paused {
96            tracing::info!(
97                currently_paused = self.is_paused,
98                newly_paused = is_paused,
99                "update paused state"
100            );
101            self.is_paused = is_paused;
102        }
103    }
104
105    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
106        &self.in_flight_prev_epoch
107    }
108
109    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
110    pub fn next_barrier_info(
111        &mut self,
112        is_checkpoint: bool,
113        curr_epoch: TracedEpoch,
114    ) -> BarrierInfo {
115        assert!(
116            self.in_flight_prev_epoch.value() < curr_epoch.value(),
117            "curr epoch regress. {} > {}",
118            self.in_flight_prev_epoch.value(),
119            curr_epoch.value()
120        );
121        let prev_epoch = self.in_flight_prev_epoch.clone();
122        self.in_flight_prev_epoch = curr_epoch.clone();
123        self.pending_non_checkpoint_barriers
124            .push(prev_epoch.value().0);
125        let kind = if is_checkpoint {
126            let epochs = take(&mut self.pending_non_checkpoint_barriers);
127            BarrierKind::Checkpoint(epochs)
128        } else {
129            BarrierKind::Barrier
130        };
131        BarrierInfo {
132            prev_epoch,
133            curr_epoch,
134            kind,
135        }
136    }
137}
138
139pub(super) struct ApplyCommandInfo {
140    pub mv_subscription_max_retention: HashMap<TableId, u64>,
141    pub table_ids_to_commit: HashSet<TableId>,
142    pub jobs_to_wait: HashSet<JobId>,
143    pub command: PostCollectCommand,
144}
145
146/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
147/// node actors, and post-collect command.
148type ApplyCommandResult = (
149    Option<Mutation>,
150    HashSet<TableId>,
151    Option<StreamJobActorsToCreate>,
152    HashMap<WorkerId, HashSet<ActorId>>,
153    PostCollectCommand,
154);
155
156impl DatabaseCheckpointControl {
157    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
158    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
159        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
160        let node_actors =
161            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
162        (table_ids_to_commit, node_actors)
163    }
164
165    /// Helper for the simplest command variants: those that only need a
166    /// pre-computed mutation and a command name, with no actors to create
167    /// and no additional side effects on `self`.
168    fn apply_simple_command(
169        &self,
170        mutation: Option<Mutation>,
171        command_name: &'static str,
172    ) -> ApplyCommandResult {
173        let (table_ids, node_actors) = self.collect_base_info();
174        (
175            mutation,
176            table_ids,
177            None,
178            node_actors,
179            PostCollectCommand::Command(command_name.to_owned()),
180        )
181    }
182
183    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
184    /// will be removed from the state after the info get resolved.
185    pub(super) fn apply_command(
186        &mut self,
187        command: Option<Command>,
188        notifiers: &mut Vec<Notifier>,
189        barrier_info: &BarrierInfo,
190        partial_graph_manager: &mut PartialGraphManager,
191        hummock_version_stats: &HummockVersionStats,
192    ) -> MetaResult<ApplyCommandInfo> {
193        debug_assert!(
194            !matches!(
195                command,
196                Some(Command::RescheduleIntent {
197                    reschedule_plan: None,
198                    ..
199                })
200            ),
201            "reschedule intent must be resolved before apply"
202        );
203        if matches!(
204            command,
205            Some(Command::RescheduleIntent {
206                reschedule_plan: None,
207                ..
208            })
209        ) {
210            bail!("reschedule intent must be resolved before apply");
211        }
212
213        /// Resolve source splits for a create streaming job command.
214        ///
215        /// Combines source fragment split resolution and backfill split alignment
216        /// into one step, looking up existing upstream actor splits from the inflight database info.
217        fn resolve_source_splits(
218            info: &CreateStreamingJobCommandInfo,
219            actor_no_shuffle: &ActorNewNoShuffle,
220            database_info: &InflightDatabaseInfo,
221        ) -> MetaResult<SplitAssignment> {
222            let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
223                &info.stream_job_fragments,
224                &info.init_split_assignment,
225            )?;
226            resolved.extend(SourceManager::resolve_backfill_splits(
227                &info.stream_job_fragments,
228                actor_no_shuffle,
229                |fragment_id, actor_id| {
230                    database_info
231                        .fragment(fragment_id)
232                        .actors
233                        .get(&actor_id)
234                        .map(|info| info.splits.clone())
235                },
236            )?);
237            Ok(resolved)
238        }
239
240        // Throttle data for creating jobs (set only in the Throttle arm)
241        let mut throttle_for_creating_jobs: Option<(
242            HashSet<JobId>,
243            HashMap<FragmentId, ThrottleConfig>,
244        )> = None;
245
246        // Each variant handles its own pre-apply, edge building, mutation generation,
247        // collect base info, and post-apply. The match produces values consumed by the
248        // common snapshot-backfill-merging code that follows.
249        let (
250            mutation,
251            mut table_ids_to_commit,
252            mut actors_to_create,
253            mut node_actors,
254            post_collect_command,
255        ) = match command {
256            None => self.apply_simple_command(None, "barrier"),
257            Some(Command::CreateStreamingJob {
258                mut info,
259                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
260                cross_db_snapshot_backfill_info,
261            }) => {
262                {
263                    assert!(!self.state.is_paused());
264                    let snapshot_epoch = barrier_info.prev_epoch();
265                    // set snapshot epoch of upstream table for snapshot backfill
266                    for snapshot_backfill_epoch in snapshot_backfill_info
267                        .upstream_mv_table_id_to_backfill_epoch
268                        .values_mut()
269                    {
270                        assert_eq!(
271                            snapshot_backfill_epoch.replace(snapshot_epoch),
272                            None,
273                            "must not set previously"
274                        );
275                    }
276                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
277                        fill_snapshot_backfill_epoch(
278                            &mut fragment.nodes,
279                            Some(&snapshot_backfill_info),
280                            &cross_db_snapshot_backfill_info,
281                        )?;
282                    }
283                    let job_id = info.stream_job_fragments.stream_job_id();
284                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
285                        .upstream_mv_table_id_to_backfill_epoch
286                        .keys()
287                        .cloned()
288                        .collect();
289
290                    // Build edges first (needed for no-shuffle mapping used in split resolution)
291                    let mut edges = self.database_info.build_edge(
292                        Some((&info, true)),
293                        None,
294                        None,
295                        partial_graph_manager.control_stream_manager(),
296                    );
297                    let actor_no_shuffle = edges.extract_no_shuffle();
298
299                    // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
300                    let resolved_split_assignment =
301                        resolve_source_splits(&info, &actor_no_shuffle, &self.database_info)?;
302
303                    let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
304                    else {
305                        panic!("duplicated creating snapshot backfill job {job_id}");
306                    };
307
308                    let job = CreatingStreamingJobControl::new(
309                        entry,
310                        CreateSnapshotBackfillJobCommandInfo {
311                            info: info.clone(),
312                            snapshot_backfill_info: snapshot_backfill_info.clone(),
313                            cross_db_snapshot_backfill_info,
314                            resolved_split_assignment: resolved_split_assignment.clone(),
315                        },
316                        take(notifiers),
317                        snapshot_backfill_upstream_tables,
318                        snapshot_epoch,
319                        hummock_version_stats,
320                        partial_graph_manager,
321                        &mut edges,
322                        &resolved_split_assignment,
323                    )?;
324
325                    self.database_info
326                        .shared_actor_infos
327                        .upsert(self.database_id, job.fragment_infos_with_job_id());
328
329                    for upstream_mv_table_id in snapshot_backfill_info
330                        .upstream_mv_table_id_to_backfill_epoch
331                        .keys()
332                    {
333                        self.database_info.register_subscriber(
334                            upstream_mv_table_id.as_job_id(),
335                            info.streaming_job.id().as_subscriber_id(),
336                            SubscriberType::SnapshotBackfill,
337                        );
338                    }
339
340                    let mutation = Command::create_streaming_job_to_mutation(
341                        &info,
342                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
343                        self.state.is_paused(),
344                        &mut edges,
345                        partial_graph_manager.control_stream_manager(),
346                        None,
347                        &resolved_split_assignment,
348                    )?;
349
350                    let (table_ids, node_actors) = self.collect_base_info();
351                    (
352                        Some(mutation),
353                        table_ids,
354                        None,
355                        node_actors,
356                        PostCollectCommand::barrier(),
357                    )
358                }
359            }
360            Some(Command::CreateStreamingJob {
361                mut info,
362                job_type,
363                cross_db_snapshot_backfill_info,
364            }) => {
365                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
366                    fill_snapshot_backfill_epoch(
367                        &mut fragment.nodes,
368                        None,
369                        &cross_db_snapshot_backfill_info,
370                    )?;
371                }
372
373                // Build edges
374                let new_upstream_sink =
375                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
376                        Some(ctx)
377                    } else {
378                        None
379                    };
380
381                let mut edges = self.database_info.build_edge(
382                    Some((&info, false)),
383                    None,
384                    new_upstream_sink,
385                    partial_graph_manager.control_stream_manager(),
386                );
387                let actor_no_shuffle = edges.extract_no_shuffle();
388
389                // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
390                let resolved_split_assignment =
391                    resolve_source_splits(&info, &actor_no_shuffle, &self.database_info)?;
392
393                // Pre-apply: add new job and fragments
394                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
395                    let (fragment, _) =
396                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
397                            .expect("should have parallel cdc fragment");
398                    Some(CdcTableBackfillTracker::new(
399                        fragment.fragment_id,
400                        splits.clone(),
401                    ))
402                } else {
403                    None
404                };
405                self.database_info
406                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
407                self.database_info.pre_apply_new_fragments(
408                    info.stream_job_fragments
409                        .new_fragment_info(&resolved_split_assignment)
410                        .map(|(fragment_id, fragment_infos)| {
411                            (fragment_id, info.streaming_job.id(), fragment_infos)
412                        }),
413                );
414                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
415                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
416                    self.database_info.pre_apply_add_node_upstream(
417                        downstream_fragment_id,
418                        &PbUpstreamSinkInfo {
419                            upstream_fragment_id: ctx.sink_fragment_id,
420                            sink_output_schema: ctx.sink_output_fields.clone(),
421                            project_exprs: ctx.project_exprs.clone(),
422                        },
423                    );
424                }
425
426                let (table_ids, node_actors) = self.collect_base_info();
427
428                // Actors to create
429                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
430                    &info, &mut edges,
431                ));
432
433                // CDC table snapshot splits
434                let actor_cdc_table_snapshot_splits = self
435                    .database_info
436                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
437
438                // Mutation
439                let is_currently_paused = self.state.is_paused();
440                let mutation = Command::create_streaming_job_to_mutation(
441                    &info,
442                    &job_type,
443                    is_currently_paused,
444                    &mut edges,
445                    partial_graph_manager.control_stream_manager(),
446                    actor_cdc_table_snapshot_splits,
447                    &resolved_split_assignment,
448                )?;
449
450                (
451                    Some(mutation),
452                    table_ids,
453                    actors_to_create,
454                    node_actors,
455                    PostCollectCommand::CreateStreamingJob {
456                        info,
457                        job_type,
458                        cross_db_snapshot_backfill_info,
459                        resolved_split_assignment,
460                    },
461                )
462            }
463
464            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
465
466            Some(Command::Pause) => {
467                let prev_is_paused = self.state.is_paused();
468                self.state.set_is_paused(true);
469                let mutation = Command::pause_to_mutation(prev_is_paused);
470                let (table_ids, node_actors) = self.collect_base_info();
471                (
472                    mutation,
473                    table_ids,
474                    None,
475                    node_actors,
476                    PostCollectCommand::Command("Pause".to_owned()),
477                )
478            }
479
480            Some(Command::Resume) => {
481                let prev_is_paused = self.state.is_paused();
482                self.state.set_is_paused(false);
483                let mutation = Command::resume_to_mutation(prev_is_paused);
484                let (table_ids, node_actors) = self.collect_base_info();
485                (
486                    mutation,
487                    table_ids,
488                    None,
489                    node_actors,
490                    PostCollectCommand::Command("Resume".to_owned()),
491                )
492            }
493
494            Some(Command::Throttle { jobs, config }) => {
495                let mutation = Some(Command::throttle_to_mutation(&config));
496                throttle_for_creating_jobs = Some((jobs, config));
497                self.apply_simple_command(mutation, "Throttle")
498            }
499
500            Some(Command::DropStreamingJobs {
501                streaming_job_ids,
502                actors,
503                unregistered_state_table_ids,
504                unregistered_fragment_ids,
505                dropped_sink_fragment_by_targets,
506            }) => {
507                // pre_apply: drop node upstream for sink targets
508                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
509                    self.database_info
510                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
511                }
512
513                let (table_ids, node_actors) = self.collect_base_info();
514
515                // post_apply: remove fragments
516                self.database_info
517                    .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
518
519                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
520                    &actors,
521                    &dropped_sink_fragment_by_targets,
522                ));
523                (
524                    mutation,
525                    table_ids,
526                    None,
527                    node_actors,
528                    PostCollectCommand::DropStreamingJobs {
529                        streaming_job_ids,
530                        unregistered_state_table_ids,
531                    },
532                )
533            }
534
535            Some(Command::RescheduleIntent {
536                reschedule_plan, ..
537            }) => {
538                let ReschedulePlan {
539                    reschedules,
540                    fragment_actors,
541                } = reschedule_plan
542                    .as_ref()
543                    .expect("reschedule intent should be resolved in global barrier worker");
544
545                // Pre-apply: reschedule fragments
546                for (fragment_id, reschedule) in reschedules {
547                    self.database_info.pre_apply_reschedule(
548                        *fragment_id,
549                        reschedule
550                            .added_actors
551                            .iter()
552                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
553                                actors.iter().map(|actor_id| {
554                                    (
555                                        *actor_id,
556                                        InflightActorInfo {
557                                            worker_id: *node_id,
558                                            vnode_bitmap: reschedule
559                                                .newly_created_actors
560                                                .get(actor_id)
561                                                .expect("should exist")
562                                                .0
563                                                .0
564                                                .vnode_bitmap
565                                                .clone(),
566                                            splits: reschedule
567                                                .actor_splits
568                                                .get(actor_id)
569                                                .cloned()
570                                                .unwrap_or_default(),
571                                        },
572                                    )
573                                })
574                            })
575                            .collect(),
576                        reschedule
577                            .vnode_bitmap_updates
578                            .iter()
579                            .filter(|(actor_id, _)| {
580                                !reschedule.newly_created_actors.contains_key(*actor_id)
581                            })
582                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
583                            .collect(),
584                        reschedule.actor_splits.clone(),
585                    );
586                }
587
588                let (table_ids, node_actors) = self.collect_base_info();
589
590                // Actors to create
591                let actors_to_create = Some(Command::reschedule_actors_to_create(
592                    reschedules,
593                    fragment_actors,
594                    &self.database_info,
595                    partial_graph_manager.control_stream_manager(),
596                ));
597
598                // Post-apply: remove old actors
599                self.database_info
600                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
601                        (
602                            *fragment_id,
603                            reschedule.removed_actors.iter().cloned().collect(),
604                        )
605                    }));
606
607                // Mutation
608                let mutation = Command::reschedule_to_mutation(
609                    reschedules,
610                    fragment_actors,
611                    partial_graph_manager.control_stream_manager(),
612                    &mut self.database_info,
613                )?;
614
615                let reschedules = reschedule_plan
616                    .expect("reschedule intent should be resolved in global barrier worker")
617                    .reschedules;
618                (
619                    mutation,
620                    table_ids,
621                    actors_to_create,
622                    node_actors,
623                    PostCollectCommand::Reschedule { reschedules },
624                )
625            }
626
627            Some(Command::ReplaceStreamJob(plan)) => {
628                // Build edges first (needed for no-shuffle mapping used in split resolution)
629                let mut edges = self.database_info.build_edge(
630                    None,
631                    Some(&plan),
632                    None,
633                    partial_graph_manager.control_stream_manager(),
634                );
635                let actor_no_shuffle = edges.extract_no_shuffle();
636
637                // Phase 2: Resolve splits to actor-level assignment.
638                let resolved_split_assignment = match &plan.split_plan {
639                    ReplaceJobSplitPlan::Discovered(discovered) => {
640                        SourceManager::resolve_fragment_to_actor_splits(
641                            &plan.new_fragments,
642                            discovered,
643                        )?
644                    }
645                    ReplaceJobSplitPlan::AlignFromPrevious => {
646                        SourceManager::resolve_replace_source_splits(
647                            &plan.new_fragments,
648                            &plan.replace_upstream,
649                            &actor_no_shuffle,
650                            |_fragment_id, actor_id| {
651                                self.database_info.fragment_infos().find_map(|fragment| {
652                                    fragment
653                                        .actors
654                                        .get(&actor_id)
655                                        .map(|info| info.splits.clone())
656                                })
657                            },
658                        )?
659                    }
660                };
661
662                // Pre-apply: add new fragments and replace upstream
663                self.database_info.pre_apply_new_fragments(
664                    plan.new_fragments
665                        .new_fragment_info(&resolved_split_assignment)
666                        .map(|(fragment_id, new_fragment)| {
667                            (fragment_id, plan.streaming_job.id(), new_fragment)
668                        }),
669                );
670                for (fragment_id, replace_map) in &plan.replace_upstream {
671                    self.database_info
672                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
673                }
674                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
675                    self.database_info
676                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
677                            (
678                                sink.new_fragment.fragment_id,
679                                sink.original_sink.id.as_job_id(),
680                                sink.new_fragment_info(),
681                            )
682                        }));
683                }
684
685                let (table_ids, node_actors) = self.collect_base_info();
686
687                // Actors to create
688                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
689                    &plan,
690                    &mut edges,
691                    &self.database_info,
692                ));
693
694                // Post-apply: remove old fragments
695                {
696                    let mut fragment_ids_to_remove: Vec<_> = plan
697                        .old_fragments
698                        .fragments
699                        .values()
700                        .map(|f| f.fragment_id)
701                        .collect();
702                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
703                        fragment_ids_to_remove
704                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
705                    }
706                    self.database_info
707                        .post_apply_remove_fragments(fragment_ids_to_remove);
708                }
709
710                // Mutation
711                let mutation = Command::replace_stream_job_to_mutation(
712                    &plan,
713                    &mut edges,
714                    &mut self.database_info,
715                    &resolved_split_assignment,
716                )?;
717
718                (
719                    mutation,
720                    table_ids,
721                    actors_to_create,
722                    node_actors,
723                    PostCollectCommand::ReplaceStreamJob {
724                        plan,
725                        resolved_split_assignment,
726                    },
727                )
728            }
729
730            Some(Command::SourceChangeSplit(split_state)) => {
731                // Pre-apply: split assignments
732                self.database_info.pre_apply_split_assignments(
733                    split_state
734                        .split_assignment
735                        .iter()
736                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
737                );
738
739                let mutation = Some(Command::source_change_split_to_mutation(
740                    &split_state.split_assignment,
741                ));
742                let (table_ids, node_actors) = self.collect_base_info();
743                (
744                    mutation,
745                    table_ids,
746                    None,
747                    node_actors,
748                    PostCollectCommand::SourceChangeSplit {
749                        split_assignment: split_state.split_assignment,
750                    },
751                )
752            }
753
754            Some(Command::CreateSubscription {
755                subscription_id,
756                upstream_mv_table_id,
757                retention_second,
758            }) => {
759                self.database_info.register_subscriber(
760                    upstream_mv_table_id.as_job_id(),
761                    subscription_id.as_subscriber_id(),
762                    SubscriberType::Subscription(retention_second),
763                );
764                let mutation = Some(Command::create_subscription_to_mutation(
765                    upstream_mv_table_id,
766                    subscription_id,
767                ));
768                let (table_ids, node_actors) = self.collect_base_info();
769                (
770                    mutation,
771                    table_ids,
772                    None,
773                    node_actors,
774                    PostCollectCommand::CreateSubscription { subscription_id },
775                )
776            }
777
778            Some(Command::DropSubscription {
779                subscription_id,
780                upstream_mv_table_id,
781            }) => {
782                if self
783                    .database_info
784                    .unregister_subscriber(
785                        upstream_mv_table_id.as_job_id(),
786                        subscription_id.as_subscriber_id(),
787                    )
788                    .is_none()
789                {
790                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
791                }
792                let mutation = Some(Command::drop_subscription_to_mutation(
793                    upstream_mv_table_id,
794                    subscription_id,
795                ));
796                let (table_ids, node_actors) = self.collect_base_info();
797                (
798                    mutation,
799                    table_ids,
800                    None,
801                    node_actors,
802                    PostCollectCommand::Command("DropSubscription".to_owned()),
803                )
804            }
805
806            Some(Command::AlterSubscriptionRetention {
807                subscription_id,
808                upstream_mv_table_id,
809                retention_second,
810            }) => {
811                self.database_info.update_subscription_retention(
812                    upstream_mv_table_id.as_job_id(),
813                    subscription_id.as_subscriber_id(),
814                    retention_second,
815                );
816                self.apply_simple_command(None, "AlterSubscriptionRetention")
817            }
818
819            Some(Command::ConnectorPropsChange(config)) => {
820                let mutation = Some(Command::connector_props_change_to_mutation(&config));
821                let (table_ids, node_actors) = self.collect_base_info();
822                (
823                    mutation,
824                    table_ids,
825                    None,
826                    node_actors,
827                    PostCollectCommand::ConnectorPropsChange(config),
828                )
829            }
830
831            Some(Command::Refresh {
832                table_id,
833                associated_source_id,
834            }) => {
835                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
836                self.apply_simple_command(mutation, "Refresh")
837            }
838
839            Some(Command::ListFinish {
840                table_id: _,
841                associated_source_id,
842            }) => {
843                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
844                self.apply_simple_command(mutation, "ListFinish")
845            }
846
847            Some(Command::LoadFinish {
848                table_id: _,
849                associated_source_id,
850            }) => {
851                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
852                self.apply_simple_command(mutation, "LoadFinish")
853            }
854
855            Some(Command::ResetSource { source_id }) => {
856                let mutation = Some(Command::reset_source_to_mutation(source_id));
857                self.apply_simple_command(mutation, "ResetSource")
858            }
859
860            Some(Command::ResumeBackfill { target }) => {
861                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
862                let (table_ids, node_actors) = self.collect_base_info();
863                (
864                    mutation,
865                    table_ids,
866                    None,
867                    node_actors,
868                    PostCollectCommand::ResumeBackfill { target },
869                )
870            }
871
872            Some(Command::InjectSourceOffsets {
873                source_id,
874                split_offsets,
875            }) => {
876                let mutation = Some(Command::inject_source_offsets_to_mutation(
877                    source_id,
878                    &split_offsets,
879                ));
880                self.apply_simple_command(mutation, "InjectSourceOffsets")
881            }
882        };
883
884        let mut finished_snapshot_backfill_jobs = HashSet::new();
885        let mutation = match mutation {
886            Some(mutation) => Some(mutation),
887            None => {
888                let mut finished_snapshot_backfill_job_info = HashMap::new();
889                if barrier_info.kind.is_checkpoint() {
890                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
891                        if creating_job.should_merge_to_upstream() {
892                            let info = creating_job
893                                .start_consume_upstream(partial_graph_manager, barrier_info)?;
894                            finished_snapshot_backfill_job_info
895                                .try_insert(job_id, info)
896                                .expect("non-duplicated");
897                        }
898                    }
899                }
900
901                if !finished_snapshot_backfill_job_info.is_empty() {
902                    let actors_to_create = actors_to_create.get_or_insert_default();
903                    let mut subscriptions_to_drop = vec![];
904                    let mut dispatcher_update = vec![];
905                    let mut actor_splits = HashMap::new();
906                    for (job_id, info) in finished_snapshot_backfill_job_info {
907                        finished_snapshot_backfill_jobs.insert(job_id);
908                        subscriptions_to_drop.extend(
909                            info.snapshot_backfill_upstream_tables.iter().map(
910                                |upstream_table_id| PbSubscriptionUpstreamInfo {
911                                    subscriber_id: job_id.as_subscriber_id(),
912                                    upstream_mv_table_id: *upstream_table_id,
913                                },
914                            ),
915                        );
916                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
917                            assert_matches!(
918                                self.database_info.unregister_subscriber(
919                                    upstream_mv_table_id.as_job_id(),
920                                    job_id.as_subscriber_id()
921                                ),
922                                Some(SubscriberType::SnapshotBackfill)
923                            );
924                        }
925
926                        table_ids_to_commit.extend(
927                            info.fragment_infos
928                                .values()
929                                .flat_map(|fragment| fragment.state_table_ids.iter())
930                                .copied(),
931                        );
932
933                        let actor_len = info
934                            .fragment_infos
935                            .values()
936                            .map(|fragment| fragment.actors.len() as u64)
937                            .sum();
938                        let id_gen = GlobalActorIdGen::new(
939                            partial_graph_manager
940                                .control_stream_manager()
941                                .env
942                                .actor_id_generator(),
943                            actor_len,
944                        );
945                        let mut next_local_actor_id = 0;
946                        // mapping from old_actor_id to new_actor_id
947                        let actor_mapping: HashMap<_, _> = info
948                            .fragment_infos
949                            .values()
950                            .flat_map(|fragment| fragment.actors.keys())
951                            .map(|old_actor_id| {
952                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
953                                next_local_actor_id += 1;
954                                (*old_actor_id, new_actor_id.as_global_id())
955                            })
956                            .collect();
957                        let actor_mapping = &actor_mapping;
958                        let new_stream_actors: HashMap<_, _> = info
959                            .stream_actors
960                            .into_iter()
961                            .map(|(old_actor_id, mut actor)| {
962                                let new_actor_id = actor_mapping[&old_actor_id];
963                                actor.actor_id = new_actor_id;
964                                (new_actor_id, actor)
965                            })
966                            .collect();
967                        let new_fragment_info: HashMap<_, _> = info
968                            .fragment_infos
969                            .into_iter()
970                            .map(|(fragment_id, mut fragment)| {
971                                let actors = take(&mut fragment.actors);
972                                fragment.actors = actors
973                                    .into_iter()
974                                    .map(|(old_actor_id, actor)| {
975                                        let new_actor_id = actor_mapping[&old_actor_id];
976                                        (new_actor_id, actor)
977                                    })
978                                    .collect();
979                                (fragment_id, fragment)
980                            })
981                            .collect();
982                        actor_splits.extend(
983                            new_fragment_info
984                                .values()
985                                .flat_map(|fragment| &fragment.actors)
986                                .map(|(actor_id, actor)| {
987                                    (
988                                        *actor_id,
989                                        ConnectorSplits {
990                                            splits: actor
991                                                .splits
992                                                .iter()
993                                                .map(ConnectorSplit::from)
994                                                .collect(),
995                                        },
996                                    )
997                                }),
998                        );
999                        // new actors belong to the database partial graph
1000                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
1001                        let mut edge_builder = FragmentEdgeBuilder::new(
1002                            info.upstream_fragment_downstreams
1003                                .keys()
1004                                .map(|upstream_fragment_id| {
1005                                    self.database_info.fragment(*upstream_fragment_id)
1006                                })
1007                                .chain(new_fragment_info.values())
1008                                .map(|fragment| {
1009                                    (
1010                                        fragment.fragment_id,
1011                                        EdgeBuilderFragmentInfo::from_inflight(
1012                                            fragment,
1013                                            partial_graph_id,
1014                                            partial_graph_manager.control_stream_manager(),
1015                                        ),
1016                                    )
1017                                }),
1018                        );
1019                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
1020                        edge_builder.add_relations(&info.downstreams);
1021                        let mut edges = edge_builder.build();
1022                        let new_actors_to_create = edges.collect_actors_to_create(
1023                            new_fragment_info.values().map(|fragment| {
1024                                (
1025                                    fragment.fragment_id,
1026                                    &fragment.nodes,
1027                                    fragment.actors.iter().map(|(actor_id, actor)| {
1028                                        (&new_stream_actors[actor_id], actor.worker_id)
1029                                    }),
1030                                    [], // no initial subscriber for backfilling job
1031                                )
1032                            }),
1033                        );
1034                        dispatcher_update.extend(
1035                            info.upstream_fragment_downstreams.keys().flat_map(
1036                                |upstream_fragment_id| {
1037                                    let new_actor_dispatchers = edges
1038                                        .dispatchers
1039                                        .remove(upstream_fragment_id)
1040                                        .expect("should exist");
1041                                    new_actor_dispatchers.into_iter().flat_map(
1042                                        |(upstream_actor_id, dispatchers)| {
1043                                            dispatchers.into_iter().map(move |dispatcher| {
1044                                                PbDispatcherUpdate {
1045                                                    actor_id: upstream_actor_id,
1046                                                    dispatcher_id: dispatcher.dispatcher_id,
1047                                                    hash_mapping: dispatcher.hash_mapping,
1048                                                    removed_downstream_actor_id: dispatcher
1049                                                        .downstream_actor_id
1050                                                        .iter()
1051                                                        .map(|new_downstream_actor_id| {
1052                                                            actor_mapping
1053                                                            .iter()
1054                                                            .find_map(
1055                                                                |(old_actor_id, new_actor_id)| {
1056                                                                    (new_downstream_actor_id
1057                                                                        == new_actor_id)
1058                                                                        .then_some(*old_actor_id)
1059                                                                },
1060                                                            )
1061                                                            .expect("should exist")
1062                                                        })
1063                                                        .collect(),
1064                                                    added_downstream_actor_id: dispatcher
1065                                                        .downstream_actor_id,
1066                                                }
1067                                            })
1068                                        },
1069                                    )
1070                                },
1071                            ),
1072                        );
1073                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1074                        for (worker_id, worker_actors) in new_actors_to_create {
1075                            node_actors.entry(worker_id).or_default().extend(
1076                                worker_actors.values().flat_map(|(_, actors, _)| {
1077                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
1078                                }),
1079                            );
1080                            actors_to_create
1081                                .entry(worker_id)
1082                                .or_default()
1083                                .extend(worker_actors);
1084                        }
1085                        self.database_info.add_existing(InflightStreamingJobInfo {
1086                            job_id,
1087                            fragment_infos: new_fragment_info,
1088                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1089                            status: CreateStreamingJobStatus::Created,
1090                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1091                        });
1092                    }
1093
1094                    Some(PbMutation::Update(PbUpdateMutation {
1095                        dispatcher_update,
1096                        merge_update: vec![], // no upstream update on existing actors
1097                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1098                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1099                        actor_splits,
1100                        actor_new_dispatchers: Default::default(), // no new dispatcher
1101                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1102                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1103                        subscriptions_to_drop,
1104                    }))
1105                } else {
1106                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1107                    if fragment_ids.is_empty() {
1108                        None
1109                    } else {
1110                        Some(PbMutation::StartFragmentBackfill(
1111                            PbStartFragmentBackfillMutation { fragment_ids },
1112                        ))
1113                    }
1114                }
1115            }
1116        };
1117
1118        // Forward barrier to creating streaming job controls
1119        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1120            if !finished_snapshot_backfill_jobs.contains(job_id) {
1121                let throttle_mutation = if let Some((ref jobs, ref config)) =
1122                    throttle_for_creating_jobs
1123                    && jobs.contains(job_id)
1124                {
1125                    assert_eq!(
1126                        jobs.len(),
1127                        1,
1128                        "should not alter rate limit of snapshot backfill job with other jobs"
1129                    );
1130                    Some((
1131                        Mutation::Throttle(ThrottleMutation {
1132                            fragment_throttle: config
1133                                .iter()
1134                                .map(|(fragment_id, config)| (*fragment_id, *config))
1135                                .collect(),
1136                        }),
1137                        take(notifiers),
1138                    ))
1139                } else {
1140                    None
1141                };
1142                creating_job.on_new_upstream_barrier(
1143                    partial_graph_manager,
1144                    barrier_info,
1145                    throttle_mutation,
1146                )?;
1147            }
1148        }
1149
1150        partial_graph_manager.inject_barrier(
1151            to_partial_graph_id(self.database_id, None),
1152            mutation,
1153            barrier_info,
1154            &node_actors,
1155            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1156            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1157            actors_to_create,
1158        )?;
1159
1160        Ok(ApplyCommandInfo {
1161            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1162            table_ids_to_commit,
1163            jobs_to_wait: finished_snapshot_backfill_jobs,
1164            command: post_collect_command,
1165        })
1166    }
1167}