risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38    PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{
45    CreatingStreamingJobControl, DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52    SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60    ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
61    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64    ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65    StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69    GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70    fill_snapshot_backfill_epoch,
71};
72
73/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
74pub(in crate::barrier) struct BarrierWorkerState {
75    /// The last sent `prev_epoch`
76    ///
77    /// There's no need to persist this field. On recovery, we will restore this from the latest
78    /// committed snapshot in `HummockManager`.
79    in_flight_prev_epoch: TracedEpoch,
80
81    /// The `prev_epoch` of pending non checkpoint barriers
82    pending_non_checkpoint_barriers: Vec<u64>,
83
84    /// Whether the cluster is paused.
85    is_paused: bool,
86}
87
88impl BarrierWorkerState {
89    pub(super) fn new() -> Self {
90        Self {
91            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92            pending_non_checkpoint_barriers: vec![],
93            is_paused: false,
94        }
95    }
96
97    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98        Self {
99            in_flight_prev_epoch,
100            pending_non_checkpoint_barriers: vec![],
101            is_paused,
102        }
103    }
104
105    pub fn is_paused(&self) -> bool {
106        self.is_paused
107    }
108
109    fn set_is_paused(&mut self, is_paused: bool) {
110        if self.is_paused != is_paused {
111            tracing::info!(
112                currently_paused = self.is_paused,
113                newly_paused = is_paused,
114                "update paused state"
115            );
116            self.is_paused = is_paused;
117        }
118    }
119
120    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121        &self.in_flight_prev_epoch
122    }
123
124    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
125    pub fn next_barrier_info(
126        &mut self,
127        is_checkpoint: bool,
128        curr_epoch: TracedEpoch,
129    ) -> BarrierInfo {
130        assert!(
131            self.in_flight_prev_epoch.value() < curr_epoch.value(),
132            "curr epoch regress. {} > {}",
133            self.in_flight_prev_epoch.value(),
134            curr_epoch.value()
135        );
136        let prev_epoch = self.in_flight_prev_epoch.clone();
137        self.in_flight_prev_epoch = curr_epoch.clone();
138        self.pending_non_checkpoint_barriers
139            .push(prev_epoch.value().0);
140        let kind = if is_checkpoint {
141            let epochs = take(&mut self.pending_non_checkpoint_barriers);
142            BarrierKind::Checkpoint(epochs)
143        } else {
144            BarrierKind::Barrier
145        };
146        BarrierInfo {
147            prev_epoch,
148            curr_epoch,
149            kind,
150        }
151    }
152}
153
154pub(super) struct ApplyCommandInfo {
155    pub jobs_to_wait: HashSet<JobId>,
156}
157
158/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
159/// node actors, and post-collect command.
160type ApplyCommandResult = (
161    Option<Mutation>,
162    HashSet<TableId>,
163    Option<StreamJobActorsToCreate>,
164    HashMap<WorkerId, HashSet<ActorId>>,
165    PostCollectCommand,
166);
167
168/// Result of actor rendering for a create/replace streaming job.
169pub(crate) struct RenderResult {
170    /// Rendered actors grouped by fragment.
171    pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172    /// Worker placement for each actor.
173    pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176/// Derive `NoShuffle` edges from fragment downstream relations and resolve ensembles.
177///
178/// This scans both the internal downstream relations (`fragments.downstreams`) and
179/// the cross-boundary upstream-to-new-fragment relations (`upstream_fragment_downstreams`)
180/// to find all `NoShuffle` edges. It then runs BFS to find connected components (ensembles)
181/// and categorizes them into:
182/// - Ensembles whose entry fragments include existing (non-new) fragments
183/// - Ensembles whose entry fragments are all newly created
184fn resolve_no_shuffle_ensembles(
185    fragments: &StreamJobFragmentsToCreate,
186    upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188    // Derive FragmentNewNoShuffle from the two downstream relation maps.
189    let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191    // Internal edges (new → new) and edges from new → existing downstream (replace job).
192    for (upstream_fid, relations) in &fragments.downstreams {
193        for rel in relations {
194            if rel.dispatcher_type == DispatcherType::NoShuffle {
195                new_no_shuffle
196                    .entry(*upstream_fid)
197                    .or_default()
198                    .insert(rel.downstream_fragment_id);
199            }
200        }
201    }
202
203    // Cross-boundary edges: existing upstream → new downstream.
204    for (upstream_fid, relations) in upstream_fragment_downstreams {
205        for rel in relations {
206            if rel.dispatcher_type == DispatcherType::NoShuffle {
207                new_no_shuffle
208                    .entry(*upstream_fid)
209                    .or_default()
210                    .insert(rel.downstream_fragment_id);
211            }
212        }
213    }
214
215    let mut ensembles = if new_no_shuffle.is_empty() {
216        Vec::new()
217    } else {
218        // Flatten into directed edge pairs for BFS.
219        let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220            .iter()
221            .flat_map(|(upstream_fid, downstream_fids)| {
222                downstream_fids
223                    .iter()
224                    .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225            })
226            .collect();
227
228        let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229            .iter()
230            .flat_map(|(u, d)| [*u, *d])
231            .collect::<HashSet<_>>()
232            .into_iter()
233            .collect();
234
235        let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236        find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237    };
238
239    // Add standalone fragments (not covered by any ensemble) as single-fragment ensembles.
240    let covered: HashSet<FragmentId> = ensembles
241        .iter()
242        .flat_map(|e| e.component_fragments())
243        .collect();
244    for fragment_id in fragments.inner.fragments.keys() {
245        if !covered.contains(fragment_id) {
246            ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247        }
248    }
249
250    Ok(ensembles)
251}
252
253/// Render actors for a create or replace streaming job.
254///
255/// This determines the parallelism for each no-shuffle ensemble (either from an existing
256/// inflight upstream or computed fresh), and produces `StreamActor` instances with worker
257/// placements and actor-level no-shuffle mappings.
258///
259/// The process follows three steps:
260/// 1. For each ensemble, resolve `EnsembleActorTemplate` (from existing or fresh).
261/// 2. For each new component fragment, allocate actor IDs and compute worker/vnode assignments.
262/// 3. Expand the simple assignments into full `StreamActor` structures.
263fn render_actors(
264    fragments: &StreamJobFragmentsToCreate,
265    database_info: &InflightDatabaseInfo,
266    definition: &str,
267    ctx: &StreamContext,
268    streaming_job_model: &streaming_job::Model,
269    actor_id_counter: &AtomicU32,
270    worker_map: &HashMap<WorkerId, WorkerNode>,
271    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
272    ensembles: &[NoShuffleEnsemble],
273    database_resource_group: &str,
274) -> MetaResult<RenderResult> {
275    // Step 2: Render actors for each ensemble.
276    // For each new fragment, produce a simple assignment: actor_id -> (worker_id, vnode_bitmap).
277    let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
278        HashMap::new();
279
280    for ensemble in ensembles {
281        // Determine the EnsembleActorTemplate for this ensemble.
282        //
283        // Check if any component fragment in the ensemble already exists (i.e. is inflight).
284        // If so, derive the actor assignment from an existing fragment. Otherwise render fresh.
285        let existing_fragment_ids: Vec<FragmentId> = ensemble
286            .component_fragments()
287            .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
288            .collect();
289
290        let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
291            let template = EnsembleActorTemplate::from_existing_inflight_fragment(
292                database_info.fragment(first_existing),
293            );
294
295            // Sanity check: all existing fragments in the same ensemble must be aligned —
296            // same actor count and same worker placement per vnode.
297            for &other_fragment_id in &existing_fragment_ids[1..] {
298                let other = EnsembleActorTemplate::from_existing_inflight_fragment(
299                    database_info.fragment(other_fragment_id),
300                );
301                template.assert_aligned_with(&other, first_existing, other_fragment_id);
302            }
303
304            template
305        } else {
306            // All fragments are new — render from scratch.
307            let first_component = ensemble
308                .component_fragments()
309                .next()
310                .expect("ensemble must have at least one component");
311            let fragment = &fragments.inner.fragments[&first_component];
312            let distribution_type: DistributionType = fragment.distribution_type.into();
313            let vnode_count = fragment.vnode_count();
314
315            // Assert all component fragments in this ensemble share the same vnode count.
316            for fragment_id in ensemble.component_fragments() {
317                let f = &fragments.inner.fragments[&fragment_id];
318                assert_eq!(
319                    vnode_count,
320                    f.vnode_count(),
321                    "component fragments {} and {} in the same no-shuffle ensemble have \
322                     different vnode counts: {} vs {}",
323                    first_component,
324                    fragment_id,
325                    vnode_count,
326                    f.vnode_count(),
327                );
328            }
329
330            EnsembleActorTemplate::render_new(
331                streaming_job_model,
332                worker_map,
333                adaptive_parallelism_strategy,
334                None,
335                database_resource_group.to_owned(),
336                distribution_type,
337                vnode_count,
338            )?
339        };
340
341        // Render each new component fragment in this ensemble.
342        for fragment_id in ensemble.component_fragments() {
343            if !fragments.inner.fragments.contains_key(&fragment_id) {
344                continue; // Skip existing fragments.
345            }
346            let fragment = &fragments.inner.fragments[&fragment_id];
347            let distribution_type: DistributionType = fragment.distribution_type.into();
348            let aligner =
349                ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
350            let assignments = aligner.align_component_actor(distribution_type);
351            actor_assignments.insert(fragment_id, assignments);
352        }
353    }
354
355    // Step 3: Expand simple assignments into full StreamActor structures.
356    let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
357    let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
358
359    for (fragment_id, assignments) in &actor_assignments {
360        let mut actors = Vec::with_capacity(assignments.len());
361        for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
362            result_actor_location.insert(actor_id, *worker_id);
363            actors.push(StreamActor {
364                actor_id,
365                fragment_id: *fragment_id,
366                vnode_bitmap: vnode_bitmap.clone(),
367                mview_definition: definition.to_owned(),
368                expr_context: Some(ctx.to_expr_context()),
369                config_override: ctx.config_override.clone(),
370            });
371        }
372        result_stream_actors.insert(*fragment_id, actors);
373    }
374
375    Ok(RenderResult {
376        stream_actors: result_stream_actors,
377        actor_location: result_actor_location,
378    })
379}
380impl DatabaseCheckpointControl {
381    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
382    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
383        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
384        let node_actors =
385            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
386        (table_ids_to_commit, node_actors)
387    }
388
389    /// Helper for the simplest command variants: those that only need a
390    /// pre-computed mutation and a command name, with no actors to create
391    /// and no additional side effects on `self`.
392    fn apply_simple_command(
393        &self,
394        mutation: Option<Mutation>,
395        command_name: &'static str,
396    ) -> ApplyCommandResult {
397        let (table_ids, node_actors) = self.collect_base_info();
398        (
399            mutation,
400            table_ids,
401            None,
402            node_actors,
403            PostCollectCommand::Command(command_name.to_owned()),
404        )
405    }
406
407    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
408    /// will be removed from the state after the info get resolved.
409    pub(super) fn apply_command(
410        &mut self,
411        command: Option<Command>,
412        notifiers: &mut Vec<Notifier>,
413        barrier_info: BarrierInfo,
414        partial_graph_manager: &mut PartialGraphManager,
415        hummock_version_stats: &HummockVersionStats,
416        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
417        worker_nodes: &HashMap<WorkerId, WorkerNode>,
418    ) -> MetaResult<ApplyCommandInfo> {
419        debug_assert!(
420            !matches!(
421                command,
422                Some(Command::RescheduleIntent {
423                    reschedule_plan: None,
424                    ..
425                })
426            ),
427            "reschedule intent must be resolved before apply"
428        );
429        if matches!(
430            command,
431            Some(Command::RescheduleIntent {
432                reschedule_plan: None,
433                ..
434            })
435        ) {
436            bail!("reschedule intent must be resolved before apply");
437        }
438
439        /// Resolve source splits for a create streaming job command.
440        ///
441        /// Combines source fragment split resolution and backfill split alignment
442        /// into one step, looking up existing upstream actor splits from the inflight database info.
443        fn resolve_source_splits(
444            info: &CreateStreamingJobCommandInfo,
445            render_result: &RenderResult,
446            actor_no_shuffle: &ActorNewNoShuffle,
447            database_info: &InflightDatabaseInfo,
448        ) -> MetaResult<SplitAssignment> {
449            let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
450                .stream_actors
451                .iter()
452                .map(|(fragment_id, actors)| {
453                    (
454                        *fragment_id,
455                        actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
456                    )
457                })
458                .collect();
459            let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
460                &info.stream_job_fragments,
461                &info.init_split_assignment,
462                &fragment_actor_ids,
463            )?;
464            resolved.extend(SourceManager::resolve_backfill_splits(
465                &info.stream_job_fragments,
466                actor_no_shuffle,
467                |fragment_id, actor_id| {
468                    database_info
469                        .fragment(fragment_id)
470                        .actors
471                        .get(&actor_id)
472                        .map(|info| info.splits.clone())
473                },
474            )?);
475            Ok(resolved)
476        }
477
478        // Throttle data for creating jobs (set only in the Throttle arm)
479        let mut throttle_for_creating_jobs: Option<(
480            HashSet<JobId>,
481            HashMap<FragmentId, ThrottleConfig>,
482        )> = None;
483
484        // Each variant handles its own pre-apply, edge building, mutation generation,
485        // collect base info, and post-apply. The match produces values consumed by the
486        // common snapshot-backfill-merging code that follows.
487        let (
488            mutation,
489            mut table_ids_to_commit,
490            mut actors_to_create,
491            mut node_actors,
492            post_collect_command,
493        ) = match command {
494            None => self.apply_simple_command(None, "barrier"),
495            Some(Command::CreateStreamingJob {
496                mut info,
497                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
498                cross_db_snapshot_backfill_info,
499            }) => {
500                let ensembles = resolve_no_shuffle_ensembles(
501                    &info.stream_job_fragments,
502                    &info.upstream_fragment_downstreams,
503                )?;
504                let actors = render_actors(
505                    &info.stream_job_fragments,
506                    &self.database_info,
507                    &info.definition,
508                    &info.stream_job_fragments.inner.ctx,
509                    &info.streaming_job_model,
510                    partial_graph_manager
511                        .control_stream_manager()
512                        .env
513                        .actor_id_generator(),
514                    worker_nodes,
515                    adaptive_parallelism_strategy,
516                    &ensembles,
517                    &info.database_resource_group,
518                )?;
519                {
520                    assert!(!self.state.is_paused());
521                    let snapshot_epoch = barrier_info.prev_epoch();
522                    // set snapshot epoch of upstream table for snapshot backfill
523                    for snapshot_backfill_epoch in snapshot_backfill_info
524                        .upstream_mv_table_id_to_backfill_epoch
525                        .values_mut()
526                    {
527                        assert_eq!(
528                            snapshot_backfill_epoch.replace(snapshot_epoch),
529                            None,
530                            "must not set previously"
531                        );
532                    }
533                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
534                        fill_snapshot_backfill_epoch(
535                            &mut fragment.nodes,
536                            Some(&snapshot_backfill_info),
537                            &cross_db_snapshot_backfill_info,
538                        )?;
539                    }
540                    let job_id = info.stream_job_fragments.stream_job_id();
541                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
542                        .upstream_mv_table_id_to_backfill_epoch
543                        .keys()
544                        .cloned()
545                        .collect();
546
547                    // Build edges first (needed for no-shuffle mapping used in split resolution)
548                    let mut edges = self.database_info.build_edge(
549                        Some((&info, true)),
550                        None,
551                        None,
552                        partial_graph_manager.control_stream_manager(),
553                        &actors.stream_actors,
554                        &actors.actor_location,
555                    );
556                    // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
557                    let resolved_split_assignment = resolve_source_splits(
558                        &info,
559                        &actors,
560                        edges.actor_new_no_shuffle(),
561                        &self.database_info,
562                    )?;
563
564                    let Entry::Vacant(entry) =
565                        self.independent_checkpoint_job_controls.entry(job_id)
566                    else {
567                        panic!("duplicated creating snapshot backfill job {job_id}");
568                    };
569
570                    let job = CreatingStreamingJobControl::new(
571                        entry,
572                        CreateSnapshotBackfillJobCommandInfo {
573                            info: info.clone(),
574                            snapshot_backfill_info: snapshot_backfill_info.clone(),
575                            cross_db_snapshot_backfill_info,
576                            resolved_split_assignment: resolved_split_assignment.clone(),
577                        },
578                        take(notifiers),
579                        snapshot_backfill_upstream_tables,
580                        snapshot_epoch,
581                        hummock_version_stats,
582                        partial_graph_manager,
583                        &mut edges,
584                        &resolved_split_assignment,
585                        &actors,
586                    )?;
587
588                    if let Some(fragment_infos) = job.fragment_infos() {
589                        self.database_info.shared_actor_infos.upsert(
590                            self.database_id,
591                            fragment_infos.values().map(|f| (f, job_id)),
592                        );
593                    }
594
595                    for upstream_mv_table_id in snapshot_backfill_info
596                        .upstream_mv_table_id_to_backfill_epoch
597                        .keys()
598                    {
599                        self.database_info.register_subscriber(
600                            upstream_mv_table_id.as_job_id(),
601                            info.streaming_job.id().as_subscriber_id(),
602                            SubscriberType::SnapshotBackfill,
603                        );
604                    }
605
606                    let mutation = Command::create_streaming_job_to_mutation(
607                        &info,
608                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
609                        self.state.is_paused(),
610                        &mut edges,
611                        partial_graph_manager.control_stream_manager(),
612                        None,
613                        &resolved_split_assignment,
614                        &actors.stream_actors,
615                        &actors.actor_location,
616                    )?;
617
618                    let (table_ids, node_actors) = self.collect_base_info();
619                    (
620                        Some(mutation),
621                        table_ids,
622                        None,
623                        node_actors,
624                        PostCollectCommand::barrier(),
625                    )
626                }
627            }
628            Some(Command::CreateStreamingJob {
629                mut info,
630                job_type,
631                cross_db_snapshot_backfill_info,
632            }) => {
633                let ensembles = resolve_no_shuffle_ensembles(
634                    &info.stream_job_fragments,
635                    &info.upstream_fragment_downstreams,
636                )?;
637                let actors = render_actors(
638                    &info.stream_job_fragments,
639                    &self.database_info,
640                    &info.definition,
641                    &info.stream_job_fragments.inner.ctx,
642                    &info.streaming_job_model,
643                    partial_graph_manager
644                        .control_stream_manager()
645                        .env
646                        .actor_id_generator(),
647                    worker_nodes,
648                    adaptive_parallelism_strategy,
649                    &ensembles,
650                    &info.database_resource_group,
651                )?;
652                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
653                    fill_snapshot_backfill_epoch(
654                        &mut fragment.nodes,
655                        None,
656                        &cross_db_snapshot_backfill_info,
657                    )?;
658                }
659
660                // Build edges
661                let new_upstream_sink =
662                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
663                        Some(ctx)
664                    } else {
665                        None
666                    };
667
668                let mut edges = self.database_info.build_edge(
669                    Some((&info, false)),
670                    None,
671                    new_upstream_sink,
672                    partial_graph_manager.control_stream_manager(),
673                    &actors.stream_actors,
674                    &actors.actor_location,
675                );
676                // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
677                let resolved_split_assignment = resolve_source_splits(
678                    &info,
679                    &actors,
680                    edges.actor_new_no_shuffle(),
681                    &self.database_info,
682                )?;
683
684                // Pre-apply: add new job and fragments
685                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
686                    let (fragment, _) =
687                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
688                            .expect("should have parallel cdc fragment");
689                    Some(CdcTableBackfillTracker::new(
690                        fragment.fragment_id,
691                        splits.clone(),
692                    ))
693                } else {
694                    None
695                };
696                self.database_info
697                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
698                self.database_info.pre_apply_new_fragments(
699                    info.stream_job_fragments
700                        .new_fragment_info(
701                            &actors.stream_actors,
702                            &actors.actor_location,
703                            &resolved_split_assignment,
704                        )
705                        .map(|(fragment_id, fragment_infos)| {
706                            (fragment_id, info.streaming_job.id(), fragment_infos)
707                        }),
708                );
709                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
710                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
711                    self.database_info.pre_apply_add_node_upstream(
712                        downstream_fragment_id,
713                        &PbUpstreamSinkInfo {
714                            upstream_fragment_id: ctx.sink_fragment_id,
715                            sink_output_schema: ctx.sink_output_fields.clone(),
716                            project_exprs: ctx.project_exprs.clone(),
717                        },
718                    );
719                }
720
721                let (table_ids, node_actors) = self.collect_base_info();
722
723                // Actors to create
724                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
725                    &info,
726                    &mut edges,
727                    &actors.stream_actors,
728                    &actors.actor_location,
729                ));
730
731                // CDC table snapshot splits
732                let actor_cdc_table_snapshot_splits = self
733                    .database_info
734                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
735
736                // Mutation
737                let is_currently_paused = self.state.is_paused();
738                let mutation = Command::create_streaming_job_to_mutation(
739                    &info,
740                    &job_type,
741                    is_currently_paused,
742                    &mut edges,
743                    partial_graph_manager.control_stream_manager(),
744                    actor_cdc_table_snapshot_splits,
745                    &resolved_split_assignment,
746                    &actors.stream_actors,
747                    &actors.actor_location,
748                )?;
749
750                (
751                    Some(mutation),
752                    table_ids,
753                    actors_to_create,
754                    node_actors,
755                    PostCollectCommand::CreateStreamingJob {
756                        info,
757                        job_type,
758                        cross_db_snapshot_backfill_info,
759                        resolved_split_assignment,
760                    },
761                )
762            }
763
764            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
765
766            Some(Command::Pause) => {
767                let prev_is_paused = self.state.is_paused();
768                self.state.set_is_paused(true);
769                let mutation = Command::pause_to_mutation(prev_is_paused);
770                let (table_ids, node_actors) = self.collect_base_info();
771                (
772                    mutation,
773                    table_ids,
774                    None,
775                    node_actors,
776                    PostCollectCommand::Command("Pause".to_owned()),
777                )
778            }
779
780            Some(Command::Resume) => {
781                let prev_is_paused = self.state.is_paused();
782                self.state.set_is_paused(false);
783                let mutation = Command::resume_to_mutation(prev_is_paused);
784                let (table_ids, node_actors) = self.collect_base_info();
785                (
786                    mutation,
787                    table_ids,
788                    None,
789                    node_actors,
790                    PostCollectCommand::Command("Resume".to_owned()),
791                )
792            }
793
794            Some(Command::Throttle { jobs, config }) => {
795                let mutation = Some(Command::throttle_to_mutation(&config));
796                for (fragment_id, throttle_config) in &config {
797                    self.database_info
798                        .pre_apply_throttle(*fragment_id, throttle_config);
799                }
800                throttle_for_creating_jobs = Some((jobs, config));
801                self.apply_simple_command(mutation, "Throttle")
802            }
803
804            Some(Command::DropStreamingJobs {
805                streaming_job_ids,
806                unregistered_fragment_ids,
807                dropped_sink_fragment_by_targets,
808                ..
809            }) => {
810                let actors = self
811                    .database_info
812                    .fragment_infos()
813                    .filter(|fragment| {
814                        self.database_info
815                            .job_id_by_fragment(fragment.fragment_id)
816                            .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
817                    })
818                    .flat_map(|fragment| fragment.actors.keys().copied())
819                    .collect::<Vec<_>>();
820
821                // pre_apply: drop node upstream for sink targets
822                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
823                    self.database_info
824                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
825                }
826
827                let (table_ids, node_actors) = self.collect_base_info();
828
829                // post_apply: remove fragments
830                self.database_info
831                    .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
832
833                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
834                    &actors,
835                    &dropped_sink_fragment_by_targets,
836                ));
837                (
838                    mutation,
839                    table_ids,
840                    None,
841                    node_actors,
842                    PostCollectCommand::DropStreamingJobs,
843                )
844            }
845
846            Some(Command::RescheduleIntent {
847                reschedule_plan, ..
848            }) => {
849                let ReschedulePlan {
850                    reschedules,
851                    fragment_actors,
852                } = reschedule_plan
853                    .as_ref()
854                    .expect("reschedule intent should be resolved in global barrier worker");
855
856                // Pre-apply: reschedule fragments
857                for (fragment_id, reschedule) in reschedules {
858                    self.database_info.pre_apply_reschedule(
859                        *fragment_id,
860                        reschedule
861                            .added_actors
862                            .iter()
863                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
864                                actors.iter().map(|actor_id| {
865                                    (
866                                        *actor_id,
867                                        InflightActorInfo {
868                                            worker_id: *node_id,
869                                            vnode_bitmap: reschedule
870                                                .newly_created_actors
871                                                .get(actor_id)
872                                                .expect("should exist")
873                                                .0
874                                                .0
875                                                .vnode_bitmap
876                                                .clone(),
877                                            splits: reschedule
878                                                .actor_splits
879                                                .get(actor_id)
880                                                .cloned()
881                                                .unwrap_or_default(),
882                                        },
883                                    )
884                                })
885                            })
886                            .collect(),
887                        reschedule
888                            .vnode_bitmap_updates
889                            .iter()
890                            .filter(|(actor_id, _)| {
891                                !reschedule.newly_created_actors.contains_key(*actor_id)
892                            })
893                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
894                            .collect(),
895                        reschedule.actor_splits.clone(),
896                    );
897                }
898
899                let (table_ids, node_actors) = self.collect_base_info();
900
901                // Actors to create
902                let actors_to_create = Some(Command::reschedule_actors_to_create(
903                    reschedules,
904                    fragment_actors,
905                    &self.database_info,
906                    partial_graph_manager.control_stream_manager(),
907                ));
908
909                // Post-apply: remove old actors
910                self.database_info
911                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
912                        (
913                            *fragment_id,
914                            reschedule.removed_actors.iter().cloned().collect(),
915                        )
916                    }));
917
918                // Mutation
919                let mutation = Command::reschedule_to_mutation(
920                    reschedules,
921                    fragment_actors,
922                    partial_graph_manager.control_stream_manager(),
923                    &mut self.database_info,
924                )?;
925
926                let reschedules = reschedule_plan
927                    .expect("reschedule intent should be resolved in global barrier worker")
928                    .reschedules;
929                (
930                    mutation,
931                    table_ids,
932                    actors_to_create,
933                    node_actors,
934                    PostCollectCommand::Reschedule { reschedules },
935                )
936            }
937
938            Some(Command::ReplaceStreamJob(plan)) => {
939                let ensembles = resolve_no_shuffle_ensembles(
940                    &plan.new_fragments,
941                    &plan.upstream_fragment_downstreams,
942                )?;
943                let mut render_result = render_actors(
944                    &plan.new_fragments,
945                    &self.database_info,
946                    "", // replace jobs don't need mview definition
947                    &plan.new_fragments.inner.ctx,
948                    &plan.streaming_job_model,
949                    partial_graph_manager
950                        .control_stream_manager()
951                        .env
952                        .actor_id_generator(),
953                    worker_nodes,
954                    adaptive_parallelism_strategy,
955                    &ensembles,
956                    &plan.database_resource_group,
957                )?;
958
959                // Render actors for auto_refresh_schema_sinks.
960                // Each sink's new_fragment inherits parallelism from its original_fragment.
961                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
962                    let actor_id_counter = partial_graph_manager
963                        .control_stream_manager()
964                        .env
965                        .actor_id_generator();
966                    for sink_ctx in sinks {
967                        let original_fragment_id = sink_ctx.original_fragment.fragment_id;
968                        let original_frag_info = self.database_info.fragment(original_fragment_id);
969                        let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
970                            original_frag_info,
971                        );
972                        let new_aligner = ComponentFragmentAligner::new_persistent(
973                            &actor_template,
974                            actor_id_counter,
975                        );
976                        let distribution_type: DistributionType =
977                            sink_ctx.new_fragment.distribution_type.into();
978                        let actor_assignments =
979                            new_aligner.align_component_actor(distribution_type);
980                        let new_fragment_id = sink_ctx.new_fragment.fragment_id;
981                        let mut actors = Vec::with_capacity(actor_assignments.len());
982                        for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
983                            render_result.actor_location.insert(actor_id, *worker_id);
984                            actors.push(StreamActor {
985                                actor_id,
986                                fragment_id: new_fragment_id,
987                                vnode_bitmap: vnode_bitmap.clone(),
988                                mview_definition: String::new(),
989                                expr_context: Some(sink_ctx.ctx.to_expr_context()),
990                                config_override: sink_ctx.ctx.config_override.clone(),
991                            });
992                        }
993                        render_result.stream_actors.insert(new_fragment_id, actors);
994                    }
995                }
996
997                // Build edges first (needed for no-shuffle mapping used in split resolution)
998                let mut edges = self.database_info.build_edge(
999                    None,
1000                    Some(&plan),
1001                    None,
1002                    partial_graph_manager.control_stream_manager(),
1003                    &render_result.stream_actors,
1004                    &render_result.actor_location,
1005                );
1006
1007                // Phase 2: Resolve splits to actor-level assignment.
1008                let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1009                    .stream_actors
1010                    .iter()
1011                    .map(|(fragment_id, actors)| {
1012                        (
1013                            *fragment_id,
1014                            actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1015                        )
1016                    })
1017                    .collect();
1018                let resolved_split_assignment = match &plan.split_plan {
1019                    ReplaceJobSplitPlan::Discovered(discovered) => {
1020                        SourceManager::resolve_fragment_to_actor_splits(
1021                            &plan.new_fragments,
1022                            discovered,
1023                            &fragment_actor_ids,
1024                        )?
1025                    }
1026                    ReplaceJobSplitPlan::AlignFromPrevious => {
1027                        SourceManager::resolve_replace_source_splits(
1028                            &plan.new_fragments,
1029                            &plan.replace_upstream,
1030                            edges.actor_new_no_shuffle(),
1031                            |_fragment_id, actor_id| {
1032                                self.database_info.fragment_infos().find_map(|fragment| {
1033                                    fragment
1034                                        .actors
1035                                        .get(&actor_id)
1036                                        .map(|info| info.splits.clone())
1037                                })
1038                            },
1039                        )?
1040                    }
1041                };
1042
1043                // Pre-apply: add new fragments and replace upstream
1044                self.database_info.pre_apply_new_fragments(
1045                    plan.new_fragments
1046                        .new_fragment_info(
1047                            &render_result.stream_actors,
1048                            &render_result.actor_location,
1049                            &resolved_split_assignment,
1050                        )
1051                        .map(|(fragment_id, new_fragment)| {
1052                            (fragment_id, plan.streaming_job.id(), new_fragment)
1053                        }),
1054                );
1055                for (fragment_id, replace_map) in &plan.replace_upstream {
1056                    self.database_info
1057                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1058                }
1059                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1060                    self.database_info
1061                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
1062                            (
1063                                sink.new_fragment.fragment_id,
1064                                sink.original_sink.id.as_job_id(),
1065                                sink.new_fragment_info(
1066                                    &render_result.stream_actors,
1067                                    &render_result.actor_location,
1068                                ),
1069                            )
1070                        }));
1071                }
1072
1073                let (table_ids, node_actors) = self.collect_base_info();
1074
1075                // Actors to create
1076                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1077                    &plan,
1078                    &mut edges,
1079                    &self.database_info,
1080                    &render_result.stream_actors,
1081                    &render_result.actor_location,
1082                ));
1083
1084                // Mutation (must be generated before removing old fragments,
1085                // because it reads actor info from database_info)
1086                let mutation = Command::replace_stream_job_to_mutation(
1087                    &plan,
1088                    &mut edges,
1089                    &mut self.database_info,
1090                    &resolved_split_assignment,
1091                )?;
1092
1093                // Post-apply: remove old fragments
1094                {
1095                    let mut fragment_ids_to_remove: Vec<_> = plan
1096                        .old_fragments
1097                        .fragments
1098                        .values()
1099                        .map(|f| f.fragment_id)
1100                        .collect();
1101                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1102                        fragment_ids_to_remove
1103                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1104                    }
1105                    self.database_info
1106                        .post_apply_remove_fragments(fragment_ids_to_remove);
1107                }
1108
1109                (
1110                    mutation,
1111                    table_ids,
1112                    actors_to_create,
1113                    node_actors,
1114                    PostCollectCommand::ReplaceStreamJob {
1115                        plan,
1116                        resolved_split_assignment,
1117                    },
1118                )
1119            }
1120
1121            Some(Command::SourceChangeSplit(split_state)) => {
1122                // Pre-apply: split assignments
1123                self.database_info.pre_apply_split_assignments(
1124                    split_state
1125                        .split_assignment
1126                        .iter()
1127                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1128                );
1129
1130                let mutation = Some(Command::source_change_split_to_mutation(
1131                    &split_state.split_assignment,
1132                ));
1133                let (table_ids, node_actors) = self.collect_base_info();
1134                (
1135                    mutation,
1136                    table_ids,
1137                    None,
1138                    node_actors,
1139                    PostCollectCommand::SourceChangeSplit {
1140                        split_assignment: split_state.split_assignment,
1141                    },
1142                )
1143            }
1144
1145            Some(Command::CreateSubscription {
1146                subscription_id,
1147                upstream_mv_table_id,
1148                retention_second,
1149            }) => {
1150                self.database_info.register_subscriber(
1151                    upstream_mv_table_id.as_job_id(),
1152                    subscription_id.as_subscriber_id(),
1153                    SubscriberType::Subscription(retention_second),
1154                );
1155                let mutation = Some(Command::create_subscription_to_mutation(
1156                    upstream_mv_table_id,
1157                    subscription_id,
1158                ));
1159                let (table_ids, node_actors) = self.collect_base_info();
1160                (
1161                    mutation,
1162                    table_ids,
1163                    None,
1164                    node_actors,
1165                    PostCollectCommand::CreateSubscription { subscription_id },
1166                )
1167            }
1168
1169            Some(Command::DropSubscription {
1170                subscription_id,
1171                upstream_mv_table_id,
1172            }) => {
1173                if self
1174                    .database_info
1175                    .unregister_subscriber(
1176                        upstream_mv_table_id.as_job_id(),
1177                        subscription_id.as_subscriber_id(),
1178                    )
1179                    .is_none()
1180                {
1181                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1182                }
1183                let mutation = Some(Command::drop_subscription_to_mutation(
1184                    upstream_mv_table_id,
1185                    subscription_id,
1186                ));
1187                let (table_ids, node_actors) = self.collect_base_info();
1188                (
1189                    mutation,
1190                    table_ids,
1191                    None,
1192                    node_actors,
1193                    PostCollectCommand::Command("DropSubscription".to_owned()),
1194                )
1195            }
1196
1197            Some(Command::AlterSubscriptionRetention {
1198                subscription_id,
1199                upstream_mv_table_id,
1200                retention_second,
1201            }) => {
1202                self.database_info.update_subscription_retention(
1203                    upstream_mv_table_id.as_job_id(),
1204                    subscription_id.as_subscriber_id(),
1205                    retention_second,
1206                );
1207                self.apply_simple_command(None, "AlterSubscriptionRetention")
1208            }
1209
1210            Some(Command::ConnectorPropsChange(config)) => {
1211                let mutation = Some(Command::connector_props_change_to_mutation(&config));
1212                let (table_ids, node_actors) = self.collect_base_info();
1213                (
1214                    mutation,
1215                    table_ids,
1216                    None,
1217                    node_actors,
1218                    PostCollectCommand::ConnectorPropsChange(config),
1219                )
1220            }
1221
1222            Some(Command::Refresh {
1223                table_id,
1224                associated_source_id,
1225            }) => {
1226                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1227                self.apply_simple_command(mutation, "Refresh")
1228            }
1229
1230            Some(Command::ListFinish {
1231                table_id: _,
1232                associated_source_id,
1233            }) => {
1234                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1235                self.apply_simple_command(mutation, "ListFinish")
1236            }
1237
1238            Some(Command::LoadFinish {
1239                table_id: _,
1240                associated_source_id,
1241            }) => {
1242                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1243                self.apply_simple_command(mutation, "LoadFinish")
1244            }
1245
1246            Some(Command::ResetSource { source_id }) => {
1247                let mutation = Some(Command::reset_source_to_mutation(source_id));
1248                self.apply_simple_command(mutation, "ResetSource")
1249            }
1250
1251            Some(Command::ResumeBackfill { target }) => {
1252                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1253                let (table_ids, node_actors) = self.collect_base_info();
1254                (
1255                    mutation,
1256                    table_ids,
1257                    None,
1258                    node_actors,
1259                    PostCollectCommand::ResumeBackfill { target },
1260                )
1261            }
1262
1263            Some(Command::InjectSourceOffsets {
1264                source_id,
1265                split_offsets,
1266            }) => {
1267                let mutation = Some(Command::inject_source_offsets_to_mutation(
1268                    source_id,
1269                    &split_offsets,
1270                ));
1271                self.apply_simple_command(mutation, "InjectSourceOffsets")
1272            }
1273        };
1274
1275        let mut finished_snapshot_backfill_jobs = HashSet::new();
1276        let mutation = match mutation {
1277            Some(mutation) => Some(mutation),
1278            None => {
1279                let mut finished_snapshot_backfill_job_info = HashMap::new();
1280                if barrier_info.kind.is_checkpoint() {
1281                    for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1282                        let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1283                            job;
1284                        if creating_job.should_merge_to_upstream() {
1285                            let info = creating_job
1286                                .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1287                            finished_snapshot_backfill_job_info
1288                                .try_insert(job_id, info)
1289                                .expect("non-duplicated");
1290                        }
1291                    }
1292                }
1293
1294                if !finished_snapshot_backfill_job_info.is_empty() {
1295                    let actors_to_create = actors_to_create.get_or_insert_default();
1296                    let mut subscriptions_to_drop = vec![];
1297                    let mut dispatcher_update = vec![];
1298                    let mut actor_splits = HashMap::new();
1299                    for (job_id, info) in finished_snapshot_backfill_job_info {
1300                        finished_snapshot_backfill_jobs.insert(job_id);
1301                        subscriptions_to_drop.extend(
1302                            info.snapshot_backfill_upstream_tables.iter().map(
1303                                |upstream_table_id| PbSubscriptionUpstreamInfo {
1304                                    subscriber_id: job_id.as_subscriber_id(),
1305                                    upstream_mv_table_id: *upstream_table_id,
1306                                },
1307                            ),
1308                        );
1309                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1310                            assert_matches!(
1311                                self.database_info.unregister_subscriber(
1312                                    upstream_mv_table_id.as_job_id(),
1313                                    job_id.as_subscriber_id()
1314                                ),
1315                                Some(SubscriberType::SnapshotBackfill)
1316                            );
1317                        }
1318
1319                        table_ids_to_commit.extend(
1320                            info.fragment_infos
1321                                .values()
1322                                .flat_map(|fragment| fragment.state_table_ids.iter())
1323                                .copied(),
1324                        );
1325
1326                        let actor_len = info
1327                            .fragment_infos
1328                            .values()
1329                            .map(|fragment| fragment.actors.len() as u64)
1330                            .sum();
1331                        let id_gen = GlobalActorIdGen::new(
1332                            partial_graph_manager
1333                                .control_stream_manager()
1334                                .env
1335                                .actor_id_generator(),
1336                            actor_len,
1337                        );
1338                        let mut next_local_actor_id = 0;
1339                        // mapping from old_actor_id to new_actor_id
1340                        let actor_mapping: HashMap<_, _> = info
1341                            .fragment_infos
1342                            .values()
1343                            .flat_map(|fragment| fragment.actors.keys())
1344                            .map(|old_actor_id| {
1345                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1346                                next_local_actor_id += 1;
1347                                (*old_actor_id, new_actor_id.as_global_id())
1348                            })
1349                            .collect();
1350                        let actor_mapping = &actor_mapping;
1351                        let new_stream_actors: HashMap<_, _> = info
1352                            .stream_actors
1353                            .into_iter()
1354                            .map(|(old_actor_id, mut actor)| {
1355                                let new_actor_id = actor_mapping[&old_actor_id];
1356                                actor.actor_id = new_actor_id;
1357                                (new_actor_id, actor)
1358                            })
1359                            .collect();
1360                        let new_fragment_info: HashMap<_, _> = info
1361                            .fragment_infos
1362                            .into_iter()
1363                            .map(|(fragment_id, mut fragment)| {
1364                                let actors = take(&mut fragment.actors);
1365                                fragment.actors = actors
1366                                    .into_iter()
1367                                    .map(|(old_actor_id, actor)| {
1368                                        let new_actor_id = actor_mapping[&old_actor_id];
1369                                        (new_actor_id, actor)
1370                                    })
1371                                    .collect();
1372                                (fragment_id, fragment)
1373                            })
1374                            .collect();
1375                        actor_splits.extend(
1376                            new_fragment_info
1377                                .values()
1378                                .flat_map(|fragment| &fragment.actors)
1379                                .map(|(actor_id, actor)| {
1380                                    (
1381                                        *actor_id,
1382                                        ConnectorSplits {
1383                                            splits: actor
1384                                                .splits
1385                                                .iter()
1386                                                .map(ConnectorSplit::from)
1387                                                .collect(),
1388                                        },
1389                                    )
1390                                }),
1391                        );
1392                        // new actors belong to the database partial graph
1393                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
1394                        let mut edge_builder = FragmentEdgeBuilder::new(
1395                            info.upstream_fragment_downstreams
1396                                .keys()
1397                                .map(|upstream_fragment_id| {
1398                                    self.database_info.fragment(*upstream_fragment_id)
1399                                })
1400                                .chain(new_fragment_info.values())
1401                                .map(|fragment| {
1402                                    (
1403                                        fragment.fragment_id,
1404                                        EdgeBuilderFragmentInfo::from_inflight(
1405                                            fragment,
1406                                            partial_graph_id,
1407                                            partial_graph_manager.control_stream_manager(),
1408                                        ),
1409                                    )
1410                                }),
1411                        );
1412                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
1413                        edge_builder.add_relations(&info.downstreams);
1414                        let mut edges = edge_builder.build();
1415                        let new_actors_to_create = edges.collect_actors_to_create(
1416                            new_fragment_info.values().map(|fragment| {
1417                                (
1418                                    fragment.fragment_id,
1419                                    &fragment.nodes,
1420                                    fragment.actors.iter().map(|(actor_id, actor)| {
1421                                        (&new_stream_actors[actor_id], actor.worker_id)
1422                                    }),
1423                                    [], // no initial subscriber for backfilling job
1424                                )
1425                            }),
1426                        );
1427                        dispatcher_update.extend(
1428                            info.upstream_fragment_downstreams.keys().flat_map(
1429                                |upstream_fragment_id| {
1430                                    let new_actor_dispatchers = edges
1431                                        .dispatchers
1432                                        .remove(upstream_fragment_id)
1433                                        .expect("should exist");
1434                                    new_actor_dispatchers.into_iter().flat_map(
1435                                        |(upstream_actor_id, dispatchers)| {
1436                                            dispatchers.into_iter().map(move |dispatcher| {
1437                                                PbDispatcherUpdate {
1438                                                    actor_id: upstream_actor_id,
1439                                                    dispatcher_id: dispatcher.dispatcher_id,
1440                                                    hash_mapping: dispatcher.hash_mapping,
1441                                                    removed_downstream_actor_id: dispatcher
1442                                                        .downstream_actor_id
1443                                                        .iter()
1444                                                        .map(|new_downstream_actor_id| {
1445                                                            actor_mapping
1446                                                            .iter()
1447                                                            .find_map(
1448                                                                |(old_actor_id, new_actor_id)| {
1449                                                                    (new_downstream_actor_id
1450                                                                        == new_actor_id)
1451                                                                        .then_some(*old_actor_id)
1452                                                                },
1453                                                            )
1454                                                            .expect("should exist")
1455                                                        })
1456                                                        .collect(),
1457                                                    added_downstream_actor_id: dispatcher
1458                                                        .downstream_actor_id,
1459                                                }
1460                                            })
1461                                        },
1462                                    )
1463                                },
1464                            ),
1465                        );
1466                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1467                        for (worker_id, worker_actors) in new_actors_to_create {
1468                            node_actors.entry(worker_id).or_default().extend(
1469                                worker_actors.values().flat_map(|(_, actors, _)| {
1470                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
1471                                }),
1472                            );
1473                            actors_to_create
1474                                .entry(worker_id)
1475                                .or_default()
1476                                .extend(worker_actors);
1477                        }
1478                        self.database_info.add_existing(InflightStreamingJobInfo {
1479                            job_id,
1480                            fragment_infos: new_fragment_info,
1481                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1482                            status: CreateStreamingJobStatus::Created,
1483                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1484                        });
1485                    }
1486
1487                    Some(PbMutation::Update(PbUpdateMutation {
1488                        dispatcher_update,
1489                        merge_update: vec![], // no upstream update on existing actors
1490                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1491                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1492                        actor_splits,
1493                        actor_new_dispatchers: Default::default(), // no new dispatcher
1494                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1495                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1496                        subscriptions_to_drop,
1497                    }))
1498                } else {
1499                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1500                    if fragment_ids.is_empty() {
1501                        None
1502                    } else {
1503                        Some(PbMutation::StartFragmentBackfill(
1504                            PbStartFragmentBackfillMutation { fragment_ids },
1505                        ))
1506                    }
1507                }
1508            }
1509        };
1510
1511        // Forward barrier to independent checkpoint job controls
1512        for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1513            let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) = job;
1514            if !finished_snapshot_backfill_jobs.contains(job_id) {
1515                let throttle_mutation = if let Some((ref jobs, ref config)) =
1516                    throttle_for_creating_jobs
1517                    && jobs.contains(job_id)
1518                {
1519                    assert_eq!(
1520                        jobs.len(),
1521                        1,
1522                        "should not alter rate limit of snapshot backfill job with other jobs"
1523                    );
1524                    Some((
1525                        Mutation::Throttle(ThrottleMutation {
1526                            fragment_throttle: config
1527                                .iter()
1528                                .map(|(fragment_id, config)| (*fragment_id, *config))
1529                                .collect(),
1530                        }),
1531                        take(notifiers),
1532                    ))
1533                } else {
1534                    None
1535                };
1536                creating_job.on_new_upstream_barrier(
1537                    partial_graph_manager,
1538                    &barrier_info,
1539                    throttle_mutation,
1540                )?;
1541            }
1542        }
1543
1544        partial_graph_manager.inject_barrier(
1545            to_partial_graph_id(self.database_id, None),
1546            mutation,
1547            &node_actors,
1548            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1549            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1550            actors_to_create,
1551            PartialGraphBarrierInfo::new(
1552                post_collect_command,
1553                barrier_info,
1554                take(notifiers),
1555                table_ids_to_commit,
1556            ),
1557        )?;
1558
1559        Ok(ApplyCommandInfo {
1560            jobs_to_wait: finished_snapshot_backfill_jobs,
1561        })
1562    }
1563}