risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38    PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
45use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
48use crate::barrier::info::{
49    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
50    SubscriberType,
51};
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::PartialGraphManager;
54use crate::barrier::rpc::to_partial_graph_id;
55use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::scale::{
58    ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
59    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
60};
61use crate::model::{
62    ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
63    StreamJobActorsToCreate, StreamJobFragmentsToCreate,
64};
65use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
66use crate::stream::{
67    GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
68    fill_snapshot_backfill_epoch,
69};
70
71/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
72pub(in crate::barrier) struct BarrierWorkerState {
73    /// The last sent `prev_epoch`
74    ///
75    /// There's no need to persist this field. On recovery, we will restore this from the latest
76    /// committed snapshot in `HummockManager`.
77    in_flight_prev_epoch: TracedEpoch,
78
79    /// The `prev_epoch` of pending non checkpoint barriers
80    pending_non_checkpoint_barriers: Vec<u64>,
81
82    /// Whether the cluster is paused.
83    is_paused: bool,
84}
85
86impl BarrierWorkerState {
87    pub(super) fn new() -> Self {
88        Self {
89            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
90            pending_non_checkpoint_barriers: vec![],
91            is_paused: false,
92        }
93    }
94
95    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
96        Self {
97            in_flight_prev_epoch,
98            pending_non_checkpoint_barriers: vec![],
99            is_paused,
100        }
101    }
102
103    pub fn is_paused(&self) -> bool {
104        self.is_paused
105    }
106
107    fn set_is_paused(&mut self, is_paused: bool) {
108        if self.is_paused != is_paused {
109            tracing::info!(
110                currently_paused = self.is_paused,
111                newly_paused = is_paused,
112                "update paused state"
113            );
114            self.is_paused = is_paused;
115        }
116    }
117
118    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
119        &self.in_flight_prev_epoch
120    }
121
122    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
123    pub fn next_barrier_info(
124        &mut self,
125        is_checkpoint: bool,
126        curr_epoch: TracedEpoch,
127    ) -> BarrierInfo {
128        assert!(
129            self.in_flight_prev_epoch.value() < curr_epoch.value(),
130            "curr epoch regress. {} > {}",
131            self.in_flight_prev_epoch.value(),
132            curr_epoch.value()
133        );
134        let prev_epoch = self.in_flight_prev_epoch.clone();
135        self.in_flight_prev_epoch = curr_epoch.clone();
136        self.pending_non_checkpoint_barriers
137            .push(prev_epoch.value().0);
138        let kind = if is_checkpoint {
139            let epochs = take(&mut self.pending_non_checkpoint_barriers);
140            BarrierKind::Checkpoint(epochs)
141        } else {
142            BarrierKind::Barrier
143        };
144        BarrierInfo {
145            prev_epoch,
146            curr_epoch,
147            kind,
148        }
149    }
150}
151
152pub(super) struct ApplyCommandInfo {
153    pub mv_subscription_max_retention: HashMap<TableId, u64>,
154    pub table_ids_to_commit: HashSet<TableId>,
155    pub jobs_to_wait: HashSet<JobId>,
156    pub command: PostCollectCommand,
157}
158
159/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
160/// node actors, and post-collect command.
161type ApplyCommandResult = (
162    Option<Mutation>,
163    HashSet<TableId>,
164    Option<StreamJobActorsToCreate>,
165    HashMap<WorkerId, HashSet<ActorId>>,
166    PostCollectCommand,
167);
168
169/// Result of actor rendering for a create/replace streaming job.
170pub(super) struct RenderResult {
171    /// Rendered actors grouped by fragment.
172    pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
173    /// Worker placement for each actor.
174    pub actor_location: HashMap<ActorId, WorkerId>,
175}
176
177/// Derive `NoShuffle` edges from fragment downstream relations and resolve ensembles.
178///
179/// This scans both the internal downstream relations (`fragments.downstreams`) and
180/// the cross-boundary upstream-to-new-fragment relations (`upstream_fragment_downstreams`)
181/// to find all `NoShuffle` edges. It then runs BFS to find connected components (ensembles)
182/// and categorizes them into:
183/// - Ensembles whose entry fragments include existing (non-new) fragments
184/// - Ensembles whose entry fragments are all newly created
185fn resolve_no_shuffle_ensembles(
186    fragments: &StreamJobFragmentsToCreate,
187    upstream_fragment_downstreams: &FragmentDownstreamRelation,
188) -> MetaResult<Vec<NoShuffleEnsemble>> {
189    // Derive FragmentNewNoShuffle from the two downstream relation maps.
190    let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
191
192    // Internal edges (new → new) and edges from new → existing downstream (replace job).
193    for (upstream_fid, relations) in &fragments.downstreams {
194        for rel in relations {
195            if rel.dispatcher_type == DispatcherType::NoShuffle {
196                new_no_shuffle
197                    .entry(*upstream_fid)
198                    .or_default()
199                    .insert(rel.downstream_fragment_id);
200            }
201        }
202    }
203
204    // Cross-boundary edges: existing upstream → new downstream.
205    for (upstream_fid, relations) in upstream_fragment_downstreams {
206        for rel in relations {
207            if rel.dispatcher_type == DispatcherType::NoShuffle {
208                new_no_shuffle
209                    .entry(*upstream_fid)
210                    .or_default()
211                    .insert(rel.downstream_fragment_id);
212            }
213        }
214    }
215
216    let mut ensembles = if new_no_shuffle.is_empty() {
217        Vec::new()
218    } else {
219        // Flatten into directed edge pairs for BFS.
220        let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
221            .iter()
222            .flat_map(|(upstream_fid, downstream_fids)| {
223                downstream_fids
224                    .iter()
225                    .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
226            })
227            .collect();
228
229        let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
230            .iter()
231            .flat_map(|(u, d)| [*u, *d])
232            .collect::<HashSet<_>>()
233            .into_iter()
234            .collect();
235
236        let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
237        find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
238    };
239
240    // Add standalone fragments (not covered by any ensemble) as single-fragment ensembles.
241    let covered: HashSet<FragmentId> = ensembles
242        .iter()
243        .flat_map(|e| e.component_fragments())
244        .collect();
245    for fragment_id in fragments.inner.fragments.keys() {
246        if !covered.contains(fragment_id) {
247            ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
248        }
249    }
250
251    Ok(ensembles)
252}
253
254/// Render actors for a create or replace streaming job.
255///
256/// This determines the parallelism for each no-shuffle ensemble (either from an existing
257/// inflight upstream or computed fresh), and produces `StreamActor` instances with worker
258/// placements and actor-level no-shuffle mappings.
259///
260/// The process follows three steps:
261/// 1. For each ensemble, resolve `EnsembleActorTemplate` (from existing or fresh).
262/// 2. For each new component fragment, allocate actor IDs and compute worker/vnode assignments.
263/// 3. Expand the simple assignments into full `StreamActor` structures.
264fn render_actors(
265    fragments: &StreamJobFragmentsToCreate,
266    database_info: &InflightDatabaseInfo,
267    definition: &str,
268    ctx: &StreamContext,
269    streaming_job_model: &streaming_job::Model,
270    actor_id_counter: &AtomicU32,
271    worker_map: &HashMap<WorkerId, WorkerNode>,
272    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
273    ensembles: &[NoShuffleEnsemble],
274    database_resource_group: &str,
275) -> MetaResult<RenderResult> {
276    // Step 2: Render actors for each ensemble.
277    // For each new fragment, produce a simple assignment: actor_id -> (worker_id, vnode_bitmap).
278    let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
279        HashMap::new();
280
281    for ensemble in ensembles {
282        // Determine the EnsembleActorTemplate for this ensemble.
283        //
284        // Check if any component fragment in the ensemble already exists (i.e. is inflight).
285        // If so, derive the actor assignment from an existing fragment. Otherwise render fresh.
286        let existing_fragment_ids: Vec<FragmentId> = ensemble
287            .component_fragments()
288            .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
289            .collect();
290
291        let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
292            let template = EnsembleActorTemplate::from_existing_inflight_fragment(
293                database_info.fragment(first_existing),
294            );
295
296            // Sanity check: all existing fragments in the same ensemble must be aligned —
297            // same actor count and same worker placement per vnode.
298            for &other_fragment_id in &existing_fragment_ids[1..] {
299                let other = EnsembleActorTemplate::from_existing_inflight_fragment(
300                    database_info.fragment(other_fragment_id),
301                );
302                template.assert_aligned_with(&other, first_existing, other_fragment_id);
303            }
304
305            template
306        } else {
307            // All fragments are new — render from scratch.
308            let first_component = ensemble
309                .component_fragments()
310                .next()
311                .expect("ensemble must have at least one component");
312            let fragment = &fragments.inner.fragments[&first_component];
313            let distribution_type: DistributionType = fragment.distribution_type.into();
314            let vnode_count = fragment.vnode_count();
315
316            // Assert all component fragments in this ensemble share the same vnode count.
317            for fragment_id in ensemble.component_fragments() {
318                let f = &fragments.inner.fragments[&fragment_id];
319                assert_eq!(
320                    vnode_count,
321                    f.vnode_count(),
322                    "component fragments {} and {} in the same no-shuffle ensemble have \
323                     different vnode counts: {} vs {}",
324                    first_component,
325                    fragment_id,
326                    vnode_count,
327                    f.vnode_count(),
328                );
329            }
330
331            EnsembleActorTemplate::render_new(
332                streaming_job_model,
333                worker_map,
334                adaptive_parallelism_strategy,
335                None,
336                database_resource_group.to_owned(),
337                distribution_type,
338                vnode_count,
339            )?
340        };
341
342        // Render each new component fragment in this ensemble.
343        for fragment_id in ensemble.component_fragments() {
344            if !fragments.inner.fragments.contains_key(&fragment_id) {
345                continue; // Skip existing fragments.
346            }
347            let fragment = &fragments.inner.fragments[&fragment_id];
348            let distribution_type: DistributionType = fragment.distribution_type.into();
349            let aligner =
350                ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
351            let assignments = aligner.align_component_actor(distribution_type);
352            actor_assignments.insert(fragment_id, assignments);
353        }
354    }
355
356    // Step 3: Expand simple assignments into full StreamActor structures.
357    let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
358    let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
359
360    for (fragment_id, assignments) in &actor_assignments {
361        let mut actors = Vec::with_capacity(assignments.len());
362        for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
363            result_actor_location.insert(actor_id, *worker_id);
364            actors.push(StreamActor {
365                actor_id,
366                fragment_id: *fragment_id,
367                vnode_bitmap: vnode_bitmap.clone(),
368                mview_definition: definition.to_owned(),
369                expr_context: Some(ctx.to_expr_context()),
370                config_override: ctx.config_override.clone(),
371            });
372        }
373        result_stream_actors.insert(*fragment_id, actors);
374    }
375
376    Ok(RenderResult {
377        stream_actors: result_stream_actors,
378        actor_location: result_actor_location,
379    })
380}
381impl DatabaseCheckpointControl {
382    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
383    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
384        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
385        let node_actors =
386            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
387        (table_ids_to_commit, node_actors)
388    }
389
390    /// Helper for the simplest command variants: those that only need a
391    /// pre-computed mutation and a command name, with no actors to create
392    /// and no additional side effects on `self`.
393    fn apply_simple_command(
394        &self,
395        mutation: Option<Mutation>,
396        command_name: &'static str,
397    ) -> ApplyCommandResult {
398        let (table_ids, node_actors) = self.collect_base_info();
399        (
400            mutation,
401            table_ids,
402            None,
403            node_actors,
404            PostCollectCommand::Command(command_name.to_owned()),
405        )
406    }
407
408    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
409    /// will be removed from the state after the info get resolved.
410    pub(super) fn apply_command(
411        &mut self,
412        command: Option<Command>,
413        notifiers: &mut Vec<Notifier>,
414        barrier_info: &BarrierInfo,
415        partial_graph_manager: &mut PartialGraphManager,
416        hummock_version_stats: &HummockVersionStats,
417        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
418        worker_nodes: &HashMap<WorkerId, WorkerNode>,
419    ) -> MetaResult<ApplyCommandInfo> {
420        debug_assert!(
421            !matches!(
422                command,
423                Some(Command::RescheduleIntent {
424                    reschedule_plan: None,
425                    ..
426                })
427            ),
428            "reschedule intent must be resolved before apply"
429        );
430        if matches!(
431            command,
432            Some(Command::RescheduleIntent {
433                reschedule_plan: None,
434                ..
435            })
436        ) {
437            bail!("reschedule intent must be resolved before apply");
438        }
439
440        /// Resolve source splits for a create streaming job command.
441        ///
442        /// Combines source fragment split resolution and backfill split alignment
443        /// into one step, looking up existing upstream actor splits from the inflight database info.
444        fn resolve_source_splits(
445            info: &CreateStreamingJobCommandInfo,
446            render_result: &RenderResult,
447            actor_no_shuffle: &ActorNewNoShuffle,
448            database_info: &InflightDatabaseInfo,
449        ) -> MetaResult<SplitAssignment> {
450            let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
451                .stream_actors
452                .iter()
453                .map(|(fragment_id, actors)| {
454                    (
455                        *fragment_id,
456                        actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
457                    )
458                })
459                .collect();
460            let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
461                &info.stream_job_fragments,
462                &info.init_split_assignment,
463                &fragment_actor_ids,
464            )?;
465            resolved.extend(SourceManager::resolve_backfill_splits(
466                &info.stream_job_fragments,
467                actor_no_shuffle,
468                |fragment_id, actor_id| {
469                    database_info
470                        .fragment(fragment_id)
471                        .actors
472                        .get(&actor_id)
473                        .map(|info| info.splits.clone())
474                },
475            )?);
476            Ok(resolved)
477        }
478
479        // Throttle data for creating jobs (set only in the Throttle arm)
480        let mut throttle_for_creating_jobs: Option<(
481            HashSet<JobId>,
482            HashMap<FragmentId, ThrottleConfig>,
483        )> = None;
484
485        // Each variant handles its own pre-apply, edge building, mutation generation,
486        // collect base info, and post-apply. The match produces values consumed by the
487        // common snapshot-backfill-merging code that follows.
488        let (
489            mutation,
490            mut table_ids_to_commit,
491            mut actors_to_create,
492            mut node_actors,
493            post_collect_command,
494        ) = match command {
495            None => self.apply_simple_command(None, "barrier"),
496            Some(Command::CreateStreamingJob {
497                mut info,
498                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
499                cross_db_snapshot_backfill_info,
500            }) => {
501                let ensembles = resolve_no_shuffle_ensembles(
502                    &info.stream_job_fragments,
503                    &info.upstream_fragment_downstreams,
504                )?;
505                let actors = render_actors(
506                    &info.stream_job_fragments,
507                    &self.database_info,
508                    &info.definition,
509                    &info.stream_job_fragments.inner.ctx,
510                    &info.streaming_job_model,
511                    partial_graph_manager
512                        .control_stream_manager()
513                        .env
514                        .actor_id_generator(),
515                    worker_nodes,
516                    adaptive_parallelism_strategy,
517                    &ensembles,
518                    &info.database_resource_group,
519                )?;
520                {
521                    assert!(!self.state.is_paused());
522                    let snapshot_epoch = barrier_info.prev_epoch();
523                    // set snapshot epoch of upstream table for snapshot backfill
524                    for snapshot_backfill_epoch in snapshot_backfill_info
525                        .upstream_mv_table_id_to_backfill_epoch
526                        .values_mut()
527                    {
528                        assert_eq!(
529                            snapshot_backfill_epoch.replace(snapshot_epoch),
530                            None,
531                            "must not set previously"
532                        );
533                    }
534                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
535                        fill_snapshot_backfill_epoch(
536                            &mut fragment.nodes,
537                            Some(&snapshot_backfill_info),
538                            &cross_db_snapshot_backfill_info,
539                        )?;
540                    }
541                    let job_id = info.stream_job_fragments.stream_job_id();
542                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
543                        .upstream_mv_table_id_to_backfill_epoch
544                        .keys()
545                        .cloned()
546                        .collect();
547
548                    // Build edges first (needed for no-shuffle mapping used in split resolution)
549                    let mut edges = self.database_info.build_edge(
550                        Some((&info, true)),
551                        None,
552                        None,
553                        partial_graph_manager.control_stream_manager(),
554                        &actors.stream_actors,
555                        &actors.actor_location,
556                    );
557                    // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
558                    let resolved_split_assignment = resolve_source_splits(
559                        &info,
560                        &actors,
561                        edges.actor_new_no_shuffle(),
562                        &self.database_info,
563                    )?;
564
565                    let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
566                    else {
567                        panic!("duplicated creating snapshot backfill job {job_id}");
568                    };
569
570                    let job = CreatingStreamingJobControl::new(
571                        entry,
572                        CreateSnapshotBackfillJobCommandInfo {
573                            info: info.clone(),
574                            snapshot_backfill_info: snapshot_backfill_info.clone(),
575                            cross_db_snapshot_backfill_info,
576                            resolved_split_assignment: resolved_split_assignment.clone(),
577                        },
578                        take(notifiers),
579                        snapshot_backfill_upstream_tables,
580                        snapshot_epoch,
581                        hummock_version_stats,
582                        partial_graph_manager,
583                        &mut edges,
584                        &resolved_split_assignment,
585                        &actors,
586                    )?;
587
588                    self.database_info
589                        .shared_actor_infos
590                        .upsert(self.database_id, job.fragment_infos_with_job_id());
591
592                    for upstream_mv_table_id in snapshot_backfill_info
593                        .upstream_mv_table_id_to_backfill_epoch
594                        .keys()
595                    {
596                        self.database_info.register_subscriber(
597                            upstream_mv_table_id.as_job_id(),
598                            info.streaming_job.id().as_subscriber_id(),
599                            SubscriberType::SnapshotBackfill,
600                        );
601                    }
602
603                    let mutation = Command::create_streaming_job_to_mutation(
604                        &info,
605                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
606                        self.state.is_paused(),
607                        &mut edges,
608                        partial_graph_manager.control_stream_manager(),
609                        None,
610                        &resolved_split_assignment,
611                        &actors.stream_actors,
612                        &actors.actor_location,
613                    )?;
614
615                    let (table_ids, node_actors) = self.collect_base_info();
616                    (
617                        Some(mutation),
618                        table_ids,
619                        None,
620                        node_actors,
621                        PostCollectCommand::barrier(),
622                    )
623                }
624            }
625            Some(Command::CreateStreamingJob {
626                mut info,
627                job_type,
628                cross_db_snapshot_backfill_info,
629            }) => {
630                let ensembles = resolve_no_shuffle_ensembles(
631                    &info.stream_job_fragments,
632                    &info.upstream_fragment_downstreams,
633                )?;
634                let actors = render_actors(
635                    &info.stream_job_fragments,
636                    &self.database_info,
637                    &info.definition,
638                    &info.stream_job_fragments.inner.ctx,
639                    &info.streaming_job_model,
640                    partial_graph_manager
641                        .control_stream_manager()
642                        .env
643                        .actor_id_generator(),
644                    worker_nodes,
645                    adaptive_parallelism_strategy,
646                    &ensembles,
647                    &info.database_resource_group,
648                )?;
649                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
650                    fill_snapshot_backfill_epoch(
651                        &mut fragment.nodes,
652                        None,
653                        &cross_db_snapshot_backfill_info,
654                    )?;
655                }
656
657                // Build edges
658                let new_upstream_sink =
659                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
660                        Some(ctx)
661                    } else {
662                        None
663                    };
664
665                let mut edges = self.database_info.build_edge(
666                    Some((&info, false)),
667                    None,
668                    new_upstream_sink,
669                    partial_graph_manager.control_stream_manager(),
670                    &actors.stream_actors,
671                    &actors.actor_location,
672                );
673                // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
674                let resolved_split_assignment = resolve_source_splits(
675                    &info,
676                    &actors,
677                    edges.actor_new_no_shuffle(),
678                    &self.database_info,
679                )?;
680
681                // Pre-apply: add new job and fragments
682                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
683                    let (fragment, _) =
684                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
685                            .expect("should have parallel cdc fragment");
686                    Some(CdcTableBackfillTracker::new(
687                        fragment.fragment_id,
688                        splits.clone(),
689                    ))
690                } else {
691                    None
692                };
693                self.database_info
694                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
695                self.database_info.pre_apply_new_fragments(
696                    info.stream_job_fragments
697                        .new_fragment_info(
698                            &actors.stream_actors,
699                            &actors.actor_location,
700                            &resolved_split_assignment,
701                        )
702                        .map(|(fragment_id, fragment_infos)| {
703                            (fragment_id, info.streaming_job.id(), fragment_infos)
704                        }),
705                );
706                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
707                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
708                    self.database_info.pre_apply_add_node_upstream(
709                        downstream_fragment_id,
710                        &PbUpstreamSinkInfo {
711                            upstream_fragment_id: ctx.sink_fragment_id,
712                            sink_output_schema: ctx.sink_output_fields.clone(),
713                            project_exprs: ctx.project_exprs.clone(),
714                        },
715                    );
716                }
717
718                let (table_ids, node_actors) = self.collect_base_info();
719
720                // Actors to create
721                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
722                    &info,
723                    &mut edges,
724                    &actors.stream_actors,
725                    &actors.actor_location,
726                ));
727
728                // CDC table snapshot splits
729                let actor_cdc_table_snapshot_splits = self
730                    .database_info
731                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
732
733                // Mutation
734                let is_currently_paused = self.state.is_paused();
735                let mutation = Command::create_streaming_job_to_mutation(
736                    &info,
737                    &job_type,
738                    is_currently_paused,
739                    &mut edges,
740                    partial_graph_manager.control_stream_manager(),
741                    actor_cdc_table_snapshot_splits,
742                    &resolved_split_assignment,
743                    &actors.stream_actors,
744                    &actors.actor_location,
745                )?;
746
747                (
748                    Some(mutation),
749                    table_ids,
750                    actors_to_create,
751                    node_actors,
752                    PostCollectCommand::CreateStreamingJob {
753                        info,
754                        job_type,
755                        cross_db_snapshot_backfill_info,
756                        resolved_split_assignment,
757                    },
758                )
759            }
760
761            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
762
763            Some(Command::Pause) => {
764                let prev_is_paused = self.state.is_paused();
765                self.state.set_is_paused(true);
766                let mutation = Command::pause_to_mutation(prev_is_paused);
767                let (table_ids, node_actors) = self.collect_base_info();
768                (
769                    mutation,
770                    table_ids,
771                    None,
772                    node_actors,
773                    PostCollectCommand::Command("Pause".to_owned()),
774                )
775            }
776
777            Some(Command::Resume) => {
778                let prev_is_paused = self.state.is_paused();
779                self.state.set_is_paused(false);
780                let mutation = Command::resume_to_mutation(prev_is_paused);
781                let (table_ids, node_actors) = self.collect_base_info();
782                (
783                    mutation,
784                    table_ids,
785                    None,
786                    node_actors,
787                    PostCollectCommand::Command("Resume".to_owned()),
788                )
789            }
790
791            Some(Command::Throttle { jobs, config }) => {
792                let mutation = Some(Command::throttle_to_mutation(&config));
793                throttle_for_creating_jobs = Some((jobs, config));
794                self.apply_simple_command(mutation, "Throttle")
795            }
796
797            Some(Command::DropStreamingJobs {
798                streaming_job_ids,
799                unregistered_state_table_ids,
800                unregistered_fragment_ids,
801                dropped_sink_fragment_by_targets,
802            }) => {
803                let actors = self
804                    .database_info
805                    .fragment_infos()
806                    .filter(|fragment| {
807                        self.database_info
808                            .job_id_by_fragment(fragment.fragment_id)
809                            .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
810                    })
811                    .flat_map(|fragment| fragment.actors.keys().copied())
812                    .collect::<Vec<_>>();
813
814                // pre_apply: drop node upstream for sink targets
815                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
816                    self.database_info
817                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
818                }
819
820                let (table_ids, node_actors) = self.collect_base_info();
821
822                // post_apply: remove fragments
823                self.database_info
824                    .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
825
826                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
827                    &actors,
828                    &dropped_sink_fragment_by_targets,
829                ));
830                (
831                    mutation,
832                    table_ids,
833                    None,
834                    node_actors,
835                    PostCollectCommand::DropStreamingJobs {
836                        streaming_job_ids,
837                        unregistered_state_table_ids,
838                    },
839                )
840            }
841
842            Some(Command::RescheduleIntent {
843                reschedule_plan, ..
844            }) => {
845                let ReschedulePlan {
846                    reschedules,
847                    fragment_actors,
848                } = reschedule_plan
849                    .as_ref()
850                    .expect("reschedule intent should be resolved in global barrier worker");
851
852                // Pre-apply: reschedule fragments
853                for (fragment_id, reschedule) in reschedules {
854                    self.database_info.pre_apply_reschedule(
855                        *fragment_id,
856                        reschedule
857                            .added_actors
858                            .iter()
859                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
860                                actors.iter().map(|actor_id| {
861                                    (
862                                        *actor_id,
863                                        InflightActorInfo {
864                                            worker_id: *node_id,
865                                            vnode_bitmap: reschedule
866                                                .newly_created_actors
867                                                .get(actor_id)
868                                                .expect("should exist")
869                                                .0
870                                                .0
871                                                .vnode_bitmap
872                                                .clone(),
873                                            splits: reschedule
874                                                .actor_splits
875                                                .get(actor_id)
876                                                .cloned()
877                                                .unwrap_or_default(),
878                                        },
879                                    )
880                                })
881                            })
882                            .collect(),
883                        reschedule
884                            .vnode_bitmap_updates
885                            .iter()
886                            .filter(|(actor_id, _)| {
887                                !reschedule.newly_created_actors.contains_key(*actor_id)
888                            })
889                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
890                            .collect(),
891                        reschedule.actor_splits.clone(),
892                    );
893                }
894
895                let (table_ids, node_actors) = self.collect_base_info();
896
897                // Actors to create
898                let actors_to_create = Some(Command::reschedule_actors_to_create(
899                    reschedules,
900                    fragment_actors,
901                    &self.database_info,
902                    partial_graph_manager.control_stream_manager(),
903                ));
904
905                // Post-apply: remove old actors
906                self.database_info
907                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
908                        (
909                            *fragment_id,
910                            reschedule.removed_actors.iter().cloned().collect(),
911                        )
912                    }));
913
914                // Mutation
915                let mutation = Command::reschedule_to_mutation(
916                    reschedules,
917                    fragment_actors,
918                    partial_graph_manager.control_stream_manager(),
919                    &mut self.database_info,
920                )?;
921
922                let reschedules = reschedule_plan
923                    .expect("reschedule intent should be resolved in global barrier worker")
924                    .reschedules;
925                (
926                    mutation,
927                    table_ids,
928                    actors_to_create,
929                    node_actors,
930                    PostCollectCommand::Reschedule { reschedules },
931                )
932            }
933
934            Some(Command::ReplaceStreamJob(plan)) => {
935                let ensembles = resolve_no_shuffle_ensembles(
936                    &plan.new_fragments,
937                    &plan.upstream_fragment_downstreams,
938                )?;
939                let mut render_result = render_actors(
940                    &plan.new_fragments,
941                    &self.database_info,
942                    "", // replace jobs don't need mview definition
943                    &plan.new_fragments.inner.ctx,
944                    &plan.streaming_job_model,
945                    partial_graph_manager
946                        .control_stream_manager()
947                        .env
948                        .actor_id_generator(),
949                    worker_nodes,
950                    adaptive_parallelism_strategy,
951                    &ensembles,
952                    &plan.database_resource_group,
953                )?;
954
955                // Render actors for auto_refresh_schema_sinks.
956                // Each sink's new_fragment inherits parallelism from its original_fragment.
957                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
958                    let actor_id_counter = partial_graph_manager
959                        .control_stream_manager()
960                        .env
961                        .actor_id_generator();
962                    for sink_ctx in sinks {
963                        let original_fragment_id = sink_ctx.original_fragment.fragment_id;
964                        let original_frag_info = self.database_info.fragment(original_fragment_id);
965                        let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
966                            original_frag_info,
967                        );
968                        let new_aligner = ComponentFragmentAligner::new_persistent(
969                            &actor_template,
970                            actor_id_counter,
971                        );
972                        let distribution_type: DistributionType =
973                            sink_ctx.new_fragment.distribution_type.into();
974                        let actor_assignments =
975                            new_aligner.align_component_actor(distribution_type);
976                        let new_fragment_id = sink_ctx.new_fragment.fragment_id;
977                        let mut actors = Vec::with_capacity(actor_assignments.len());
978                        for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
979                            render_result.actor_location.insert(actor_id, *worker_id);
980                            actors.push(StreamActor {
981                                actor_id,
982                                fragment_id: new_fragment_id,
983                                vnode_bitmap: vnode_bitmap.clone(),
984                                mview_definition: String::new(),
985                                expr_context: Some(sink_ctx.ctx.to_expr_context()),
986                                config_override: sink_ctx.ctx.config_override.clone(),
987                            });
988                        }
989                        render_result.stream_actors.insert(new_fragment_id, actors);
990                    }
991                }
992
993                // Build edges first (needed for no-shuffle mapping used in split resolution)
994                let mut edges = self.database_info.build_edge(
995                    None,
996                    Some(&plan),
997                    None,
998                    partial_graph_manager.control_stream_manager(),
999                    &render_result.stream_actors,
1000                    &render_result.actor_location,
1001                );
1002
1003                // Phase 2: Resolve splits to actor-level assignment.
1004                let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1005                    .stream_actors
1006                    .iter()
1007                    .map(|(fragment_id, actors)| {
1008                        (
1009                            *fragment_id,
1010                            actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1011                        )
1012                    })
1013                    .collect();
1014                let resolved_split_assignment = match &plan.split_plan {
1015                    ReplaceJobSplitPlan::Discovered(discovered) => {
1016                        SourceManager::resolve_fragment_to_actor_splits(
1017                            &plan.new_fragments,
1018                            discovered,
1019                            &fragment_actor_ids,
1020                        )?
1021                    }
1022                    ReplaceJobSplitPlan::AlignFromPrevious => {
1023                        SourceManager::resolve_replace_source_splits(
1024                            &plan.new_fragments,
1025                            &plan.replace_upstream,
1026                            edges.actor_new_no_shuffle(),
1027                            |_fragment_id, actor_id| {
1028                                self.database_info.fragment_infos().find_map(|fragment| {
1029                                    fragment
1030                                        .actors
1031                                        .get(&actor_id)
1032                                        .map(|info| info.splits.clone())
1033                                })
1034                            },
1035                        )?
1036                    }
1037                };
1038
1039                // Pre-apply: add new fragments and replace upstream
1040                self.database_info.pre_apply_new_fragments(
1041                    plan.new_fragments
1042                        .new_fragment_info(
1043                            &render_result.stream_actors,
1044                            &render_result.actor_location,
1045                            &resolved_split_assignment,
1046                        )
1047                        .map(|(fragment_id, new_fragment)| {
1048                            (fragment_id, plan.streaming_job.id(), new_fragment)
1049                        }),
1050                );
1051                for (fragment_id, replace_map) in &plan.replace_upstream {
1052                    self.database_info
1053                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1054                }
1055                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1056                    self.database_info
1057                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
1058                            (
1059                                sink.new_fragment.fragment_id,
1060                                sink.original_sink.id.as_job_id(),
1061                                sink.new_fragment_info(
1062                                    &render_result.stream_actors,
1063                                    &render_result.actor_location,
1064                                ),
1065                            )
1066                        }));
1067                }
1068
1069                let (table_ids, node_actors) = self.collect_base_info();
1070
1071                // Actors to create
1072                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1073                    &plan,
1074                    &mut edges,
1075                    &self.database_info,
1076                    &render_result.stream_actors,
1077                    &render_result.actor_location,
1078                ));
1079
1080                // Mutation (must be generated before removing old fragments,
1081                // because it reads actor info from database_info)
1082                let mutation = Command::replace_stream_job_to_mutation(
1083                    &plan,
1084                    &mut edges,
1085                    &mut self.database_info,
1086                    &resolved_split_assignment,
1087                )?;
1088
1089                // Post-apply: remove old fragments
1090                {
1091                    let mut fragment_ids_to_remove: Vec<_> = plan
1092                        .old_fragments
1093                        .fragments
1094                        .values()
1095                        .map(|f| f.fragment_id)
1096                        .collect();
1097                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1098                        fragment_ids_to_remove
1099                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1100                    }
1101                    self.database_info
1102                        .post_apply_remove_fragments(fragment_ids_to_remove);
1103                }
1104
1105                (
1106                    mutation,
1107                    table_ids,
1108                    actors_to_create,
1109                    node_actors,
1110                    PostCollectCommand::ReplaceStreamJob {
1111                        plan,
1112                        resolved_split_assignment,
1113                    },
1114                )
1115            }
1116
1117            Some(Command::SourceChangeSplit(split_state)) => {
1118                // Pre-apply: split assignments
1119                self.database_info.pre_apply_split_assignments(
1120                    split_state
1121                        .split_assignment
1122                        .iter()
1123                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1124                );
1125
1126                let mutation = Some(Command::source_change_split_to_mutation(
1127                    &split_state.split_assignment,
1128                ));
1129                let (table_ids, node_actors) = self.collect_base_info();
1130                (
1131                    mutation,
1132                    table_ids,
1133                    None,
1134                    node_actors,
1135                    PostCollectCommand::SourceChangeSplit {
1136                        split_assignment: split_state.split_assignment,
1137                    },
1138                )
1139            }
1140
1141            Some(Command::CreateSubscription {
1142                subscription_id,
1143                upstream_mv_table_id,
1144                retention_second,
1145            }) => {
1146                self.database_info.register_subscriber(
1147                    upstream_mv_table_id.as_job_id(),
1148                    subscription_id.as_subscriber_id(),
1149                    SubscriberType::Subscription(retention_second),
1150                );
1151                let mutation = Some(Command::create_subscription_to_mutation(
1152                    upstream_mv_table_id,
1153                    subscription_id,
1154                ));
1155                let (table_ids, node_actors) = self.collect_base_info();
1156                (
1157                    mutation,
1158                    table_ids,
1159                    None,
1160                    node_actors,
1161                    PostCollectCommand::CreateSubscription { subscription_id },
1162                )
1163            }
1164
1165            Some(Command::DropSubscription {
1166                subscription_id,
1167                upstream_mv_table_id,
1168            }) => {
1169                if self
1170                    .database_info
1171                    .unregister_subscriber(
1172                        upstream_mv_table_id.as_job_id(),
1173                        subscription_id.as_subscriber_id(),
1174                    )
1175                    .is_none()
1176                {
1177                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1178                }
1179                let mutation = Some(Command::drop_subscription_to_mutation(
1180                    upstream_mv_table_id,
1181                    subscription_id,
1182                ));
1183                let (table_ids, node_actors) = self.collect_base_info();
1184                (
1185                    mutation,
1186                    table_ids,
1187                    None,
1188                    node_actors,
1189                    PostCollectCommand::Command("DropSubscription".to_owned()),
1190                )
1191            }
1192
1193            Some(Command::AlterSubscriptionRetention {
1194                subscription_id,
1195                upstream_mv_table_id,
1196                retention_second,
1197            }) => {
1198                self.database_info.update_subscription_retention(
1199                    upstream_mv_table_id.as_job_id(),
1200                    subscription_id.as_subscriber_id(),
1201                    retention_second,
1202                );
1203                self.apply_simple_command(None, "AlterSubscriptionRetention")
1204            }
1205
1206            Some(Command::ConnectorPropsChange(config)) => {
1207                let mutation = Some(Command::connector_props_change_to_mutation(&config));
1208                let (table_ids, node_actors) = self.collect_base_info();
1209                (
1210                    mutation,
1211                    table_ids,
1212                    None,
1213                    node_actors,
1214                    PostCollectCommand::ConnectorPropsChange(config),
1215                )
1216            }
1217
1218            Some(Command::Refresh {
1219                table_id,
1220                associated_source_id,
1221            }) => {
1222                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1223                self.apply_simple_command(mutation, "Refresh")
1224            }
1225
1226            Some(Command::ListFinish {
1227                table_id: _,
1228                associated_source_id,
1229            }) => {
1230                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1231                self.apply_simple_command(mutation, "ListFinish")
1232            }
1233
1234            Some(Command::LoadFinish {
1235                table_id: _,
1236                associated_source_id,
1237            }) => {
1238                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1239                self.apply_simple_command(mutation, "LoadFinish")
1240            }
1241
1242            Some(Command::ResetSource { source_id }) => {
1243                let mutation = Some(Command::reset_source_to_mutation(source_id));
1244                self.apply_simple_command(mutation, "ResetSource")
1245            }
1246
1247            Some(Command::ResumeBackfill { target }) => {
1248                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1249                let (table_ids, node_actors) = self.collect_base_info();
1250                (
1251                    mutation,
1252                    table_ids,
1253                    None,
1254                    node_actors,
1255                    PostCollectCommand::ResumeBackfill { target },
1256                )
1257            }
1258
1259            Some(Command::InjectSourceOffsets {
1260                source_id,
1261                split_offsets,
1262            }) => {
1263                let mutation = Some(Command::inject_source_offsets_to_mutation(
1264                    source_id,
1265                    &split_offsets,
1266                ));
1267                self.apply_simple_command(mutation, "InjectSourceOffsets")
1268            }
1269        };
1270
1271        let mut finished_snapshot_backfill_jobs = HashSet::new();
1272        let mutation = match mutation {
1273            Some(mutation) => Some(mutation),
1274            None => {
1275                let mut finished_snapshot_backfill_job_info = HashMap::new();
1276                if barrier_info.kind.is_checkpoint() {
1277                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
1278                        if creating_job.should_merge_to_upstream() {
1279                            let info = creating_job
1280                                .start_consume_upstream(partial_graph_manager, barrier_info)?;
1281                            finished_snapshot_backfill_job_info
1282                                .try_insert(job_id, info)
1283                                .expect("non-duplicated");
1284                        }
1285                    }
1286                }
1287
1288                if !finished_snapshot_backfill_job_info.is_empty() {
1289                    let actors_to_create = actors_to_create.get_or_insert_default();
1290                    let mut subscriptions_to_drop = vec![];
1291                    let mut dispatcher_update = vec![];
1292                    let mut actor_splits = HashMap::new();
1293                    for (job_id, info) in finished_snapshot_backfill_job_info {
1294                        finished_snapshot_backfill_jobs.insert(job_id);
1295                        subscriptions_to_drop.extend(
1296                            info.snapshot_backfill_upstream_tables.iter().map(
1297                                |upstream_table_id| PbSubscriptionUpstreamInfo {
1298                                    subscriber_id: job_id.as_subscriber_id(),
1299                                    upstream_mv_table_id: *upstream_table_id,
1300                                },
1301                            ),
1302                        );
1303                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1304                            assert_matches!(
1305                                self.database_info.unregister_subscriber(
1306                                    upstream_mv_table_id.as_job_id(),
1307                                    job_id.as_subscriber_id()
1308                                ),
1309                                Some(SubscriberType::SnapshotBackfill)
1310                            );
1311                        }
1312
1313                        table_ids_to_commit.extend(
1314                            info.fragment_infos
1315                                .values()
1316                                .flat_map(|fragment| fragment.state_table_ids.iter())
1317                                .copied(),
1318                        );
1319
1320                        let actor_len = info
1321                            .fragment_infos
1322                            .values()
1323                            .map(|fragment| fragment.actors.len() as u64)
1324                            .sum();
1325                        let id_gen = GlobalActorIdGen::new(
1326                            partial_graph_manager
1327                                .control_stream_manager()
1328                                .env
1329                                .actor_id_generator(),
1330                            actor_len,
1331                        );
1332                        let mut next_local_actor_id = 0;
1333                        // mapping from old_actor_id to new_actor_id
1334                        let actor_mapping: HashMap<_, _> = info
1335                            .fragment_infos
1336                            .values()
1337                            .flat_map(|fragment| fragment.actors.keys())
1338                            .map(|old_actor_id| {
1339                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1340                                next_local_actor_id += 1;
1341                                (*old_actor_id, new_actor_id.as_global_id())
1342                            })
1343                            .collect();
1344                        let actor_mapping = &actor_mapping;
1345                        let new_stream_actors: HashMap<_, _> = info
1346                            .stream_actors
1347                            .into_iter()
1348                            .map(|(old_actor_id, mut actor)| {
1349                                let new_actor_id = actor_mapping[&old_actor_id];
1350                                actor.actor_id = new_actor_id;
1351                                (new_actor_id, actor)
1352                            })
1353                            .collect();
1354                        let new_fragment_info: HashMap<_, _> = info
1355                            .fragment_infos
1356                            .into_iter()
1357                            .map(|(fragment_id, mut fragment)| {
1358                                let actors = take(&mut fragment.actors);
1359                                fragment.actors = actors
1360                                    .into_iter()
1361                                    .map(|(old_actor_id, actor)| {
1362                                        let new_actor_id = actor_mapping[&old_actor_id];
1363                                        (new_actor_id, actor)
1364                                    })
1365                                    .collect();
1366                                (fragment_id, fragment)
1367                            })
1368                            .collect();
1369                        actor_splits.extend(
1370                            new_fragment_info
1371                                .values()
1372                                .flat_map(|fragment| &fragment.actors)
1373                                .map(|(actor_id, actor)| {
1374                                    (
1375                                        *actor_id,
1376                                        ConnectorSplits {
1377                                            splits: actor
1378                                                .splits
1379                                                .iter()
1380                                                .map(ConnectorSplit::from)
1381                                                .collect(),
1382                                        },
1383                                    )
1384                                }),
1385                        );
1386                        // new actors belong to the database partial graph
1387                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
1388                        let mut edge_builder = FragmentEdgeBuilder::new(
1389                            info.upstream_fragment_downstreams
1390                                .keys()
1391                                .map(|upstream_fragment_id| {
1392                                    self.database_info.fragment(*upstream_fragment_id)
1393                                })
1394                                .chain(new_fragment_info.values())
1395                                .map(|fragment| {
1396                                    (
1397                                        fragment.fragment_id,
1398                                        EdgeBuilderFragmentInfo::from_inflight(
1399                                            fragment,
1400                                            partial_graph_id,
1401                                            partial_graph_manager.control_stream_manager(),
1402                                        ),
1403                                    )
1404                                }),
1405                        );
1406                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
1407                        edge_builder.add_relations(&info.downstreams);
1408                        let mut edges = edge_builder.build();
1409                        let new_actors_to_create = edges.collect_actors_to_create(
1410                            new_fragment_info.values().map(|fragment| {
1411                                (
1412                                    fragment.fragment_id,
1413                                    &fragment.nodes,
1414                                    fragment.actors.iter().map(|(actor_id, actor)| {
1415                                        (&new_stream_actors[actor_id], actor.worker_id)
1416                                    }),
1417                                    [], // no initial subscriber for backfilling job
1418                                )
1419                            }),
1420                        );
1421                        dispatcher_update.extend(
1422                            info.upstream_fragment_downstreams.keys().flat_map(
1423                                |upstream_fragment_id| {
1424                                    let new_actor_dispatchers = edges
1425                                        .dispatchers
1426                                        .remove(upstream_fragment_id)
1427                                        .expect("should exist");
1428                                    new_actor_dispatchers.into_iter().flat_map(
1429                                        |(upstream_actor_id, dispatchers)| {
1430                                            dispatchers.into_iter().map(move |dispatcher| {
1431                                                PbDispatcherUpdate {
1432                                                    actor_id: upstream_actor_id,
1433                                                    dispatcher_id: dispatcher.dispatcher_id,
1434                                                    hash_mapping: dispatcher.hash_mapping,
1435                                                    removed_downstream_actor_id: dispatcher
1436                                                        .downstream_actor_id
1437                                                        .iter()
1438                                                        .map(|new_downstream_actor_id| {
1439                                                            actor_mapping
1440                                                            .iter()
1441                                                            .find_map(
1442                                                                |(old_actor_id, new_actor_id)| {
1443                                                                    (new_downstream_actor_id
1444                                                                        == new_actor_id)
1445                                                                        .then_some(*old_actor_id)
1446                                                                },
1447                                                            )
1448                                                            .expect("should exist")
1449                                                        })
1450                                                        .collect(),
1451                                                    added_downstream_actor_id: dispatcher
1452                                                        .downstream_actor_id,
1453                                                }
1454                                            })
1455                                        },
1456                                    )
1457                                },
1458                            ),
1459                        );
1460                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1461                        for (worker_id, worker_actors) in new_actors_to_create {
1462                            node_actors.entry(worker_id).or_default().extend(
1463                                worker_actors.values().flat_map(|(_, actors, _)| {
1464                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
1465                                }),
1466                            );
1467                            actors_to_create
1468                                .entry(worker_id)
1469                                .or_default()
1470                                .extend(worker_actors);
1471                        }
1472                        self.database_info.add_existing(InflightStreamingJobInfo {
1473                            job_id,
1474                            fragment_infos: new_fragment_info,
1475                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1476                            status: CreateStreamingJobStatus::Created,
1477                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1478                        });
1479                    }
1480
1481                    Some(PbMutation::Update(PbUpdateMutation {
1482                        dispatcher_update,
1483                        merge_update: vec![], // no upstream update on existing actors
1484                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1485                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1486                        actor_splits,
1487                        actor_new_dispatchers: Default::default(), // no new dispatcher
1488                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1489                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1490                        subscriptions_to_drop,
1491                    }))
1492                } else {
1493                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1494                    if fragment_ids.is_empty() {
1495                        None
1496                    } else {
1497                        Some(PbMutation::StartFragmentBackfill(
1498                            PbStartFragmentBackfillMutation { fragment_ids },
1499                        ))
1500                    }
1501                }
1502            }
1503        };
1504
1505        // Forward barrier to creating streaming job controls
1506        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1507            if !finished_snapshot_backfill_jobs.contains(job_id) {
1508                let throttle_mutation = if let Some((ref jobs, ref config)) =
1509                    throttle_for_creating_jobs
1510                    && jobs.contains(job_id)
1511                {
1512                    assert_eq!(
1513                        jobs.len(),
1514                        1,
1515                        "should not alter rate limit of snapshot backfill job with other jobs"
1516                    );
1517                    Some((
1518                        Mutation::Throttle(ThrottleMutation {
1519                            fragment_throttle: config
1520                                .iter()
1521                                .map(|(fragment_id, config)| (*fragment_id, *config))
1522                                .collect(),
1523                        }),
1524                        take(notifiers),
1525                    ))
1526                } else {
1527                    None
1528                };
1529                creating_job.on_new_upstream_barrier(
1530                    partial_graph_manager,
1531                    barrier_info,
1532                    throttle_mutation,
1533                )?;
1534            }
1535        }
1536
1537        partial_graph_manager.inject_barrier(
1538            to_partial_graph_id(self.database_id, None),
1539            mutation,
1540            barrier_info,
1541            &node_actors,
1542            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1543            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1544            actors_to_create,
1545        )?;
1546
1547        Ok(ApplyCommandInfo {
1548            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1549            table_ids_to_commit,
1550            jobs_to_wait: finished_snapshot_backfill_jobs,
1551            command: post_collect_command,
1552        })
1553    }
1554}