risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::system_param::AdaptiveParallelismStrategy;
27use risingwave_common::util::epoch::Epoch;
28use risingwave_meta_model::fragment::DistributionType;
29use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
30use risingwave_pb::common::WorkerNode;
31use risingwave_pb::hummock::HummockVersionStats;
32use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
33use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
34use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
35use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
36use risingwave_pb::stream_plan::{
37    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
38    PbUpstreamSinkInfo, ThrottleMutation,
39};
40use tracing::warn;
41
42use crate::MetaResult;
43use crate::barrier::cdc_progress::CdcTableBackfillTracker;
44use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
45use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
46use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
47use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
48use crate::barrier::info::{
49    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
50    SubscriberType,
51};
52use crate::barrier::notifier::Notifier;
53use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
54use crate::barrier::rpc::to_partial_graph_id;
55use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
56use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
57use crate::controller::scale::{
58    ComponentFragmentAligner, EnsembleActorTemplate, NoShuffleEnsemble,
59    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
60};
61use crate::model::{
62    ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
63    StreamJobActorsToCreate, StreamJobFragmentsToCreate,
64};
65use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
66use crate::stream::{
67    GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
68    fill_snapshot_backfill_epoch,
69};
70
71/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
72pub(in crate::barrier) struct BarrierWorkerState {
73    /// The last sent `prev_epoch`
74    ///
75    /// There's no need to persist this field. On recovery, we will restore this from the latest
76    /// committed snapshot in `HummockManager`.
77    in_flight_prev_epoch: TracedEpoch,
78
79    /// The `prev_epoch` of pending non checkpoint barriers
80    pending_non_checkpoint_barriers: Vec<u64>,
81
82    /// Whether the cluster is paused.
83    is_paused: bool,
84}
85
86impl BarrierWorkerState {
87    pub(super) fn new() -> Self {
88        Self {
89            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
90            pending_non_checkpoint_barriers: vec![],
91            is_paused: false,
92        }
93    }
94
95    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
96        Self {
97            in_flight_prev_epoch,
98            pending_non_checkpoint_barriers: vec![],
99            is_paused,
100        }
101    }
102
103    pub fn is_paused(&self) -> bool {
104        self.is_paused
105    }
106
107    fn set_is_paused(&mut self, is_paused: bool) {
108        if self.is_paused != is_paused {
109            tracing::info!(
110                currently_paused = self.is_paused,
111                newly_paused = is_paused,
112                "update paused state"
113            );
114            self.is_paused = is_paused;
115        }
116    }
117
118    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
119        &self.in_flight_prev_epoch
120    }
121
122    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
123    pub fn next_barrier_info(
124        &mut self,
125        is_checkpoint: bool,
126        curr_epoch: TracedEpoch,
127    ) -> BarrierInfo {
128        assert!(
129            self.in_flight_prev_epoch.value() < curr_epoch.value(),
130            "curr epoch regress. {} > {}",
131            self.in_flight_prev_epoch.value(),
132            curr_epoch.value()
133        );
134        let prev_epoch = self.in_flight_prev_epoch.clone();
135        self.in_flight_prev_epoch = curr_epoch.clone();
136        self.pending_non_checkpoint_barriers
137            .push(prev_epoch.value().0);
138        let kind = if is_checkpoint {
139            let epochs = take(&mut self.pending_non_checkpoint_barriers);
140            BarrierKind::Checkpoint(epochs)
141        } else {
142            BarrierKind::Barrier
143        };
144        BarrierInfo {
145            prev_epoch,
146            curr_epoch,
147            kind,
148        }
149    }
150}
151
152pub(super) struct ApplyCommandInfo {
153    pub jobs_to_wait: HashSet<JobId>,
154}
155
156/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
157/// node actors, and post-collect command.
158type ApplyCommandResult = (
159    Option<Mutation>,
160    HashSet<TableId>,
161    Option<StreamJobActorsToCreate>,
162    HashMap<WorkerId, HashSet<ActorId>>,
163    PostCollectCommand,
164);
165
166/// Result of actor rendering for a create/replace streaming job.
167pub(super) struct RenderResult {
168    /// Rendered actors grouped by fragment.
169    pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
170    /// Worker placement for each actor.
171    pub actor_location: HashMap<ActorId, WorkerId>,
172}
173
174/// Derive `NoShuffle` edges from fragment downstream relations and resolve ensembles.
175///
176/// This scans both the internal downstream relations (`fragments.downstreams`) and
177/// the cross-boundary upstream-to-new-fragment relations (`upstream_fragment_downstreams`)
178/// to find all `NoShuffle` edges. It then runs BFS to find connected components (ensembles)
179/// and categorizes them into:
180/// - Ensembles whose entry fragments include existing (non-new) fragments
181/// - Ensembles whose entry fragments are all newly created
182fn resolve_no_shuffle_ensembles(
183    fragments: &StreamJobFragmentsToCreate,
184    upstream_fragment_downstreams: &FragmentDownstreamRelation,
185) -> MetaResult<Vec<NoShuffleEnsemble>> {
186    // Derive FragmentNewNoShuffle from the two downstream relation maps.
187    let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
188
189    // Internal edges (new → new) and edges from new → existing downstream (replace job).
190    for (upstream_fid, relations) in &fragments.downstreams {
191        for rel in relations {
192            if rel.dispatcher_type == DispatcherType::NoShuffle {
193                new_no_shuffle
194                    .entry(*upstream_fid)
195                    .or_default()
196                    .insert(rel.downstream_fragment_id);
197            }
198        }
199    }
200
201    // Cross-boundary edges: existing upstream → new downstream.
202    for (upstream_fid, relations) in upstream_fragment_downstreams {
203        for rel in relations {
204            if rel.dispatcher_type == DispatcherType::NoShuffle {
205                new_no_shuffle
206                    .entry(*upstream_fid)
207                    .or_default()
208                    .insert(rel.downstream_fragment_id);
209            }
210        }
211    }
212
213    let mut ensembles = if new_no_shuffle.is_empty() {
214        Vec::new()
215    } else {
216        // Flatten into directed edge pairs for BFS.
217        let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
218            .iter()
219            .flat_map(|(upstream_fid, downstream_fids)| {
220                downstream_fids
221                    .iter()
222                    .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
223            })
224            .collect();
225
226        let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
227            .iter()
228            .flat_map(|(u, d)| [*u, *d])
229            .collect::<HashSet<_>>()
230            .into_iter()
231            .collect();
232
233        let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
234        find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
235    };
236
237    // Add standalone fragments (not covered by any ensemble) as single-fragment ensembles.
238    let covered: HashSet<FragmentId> = ensembles
239        .iter()
240        .flat_map(|e| e.component_fragments())
241        .collect();
242    for fragment_id in fragments.inner.fragments.keys() {
243        if !covered.contains(fragment_id) {
244            ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
245        }
246    }
247
248    Ok(ensembles)
249}
250
251/// Render actors for a create or replace streaming job.
252///
253/// This determines the parallelism for each no-shuffle ensemble (either from an existing
254/// inflight upstream or computed fresh), and produces `StreamActor` instances with worker
255/// placements and actor-level no-shuffle mappings.
256///
257/// The process follows three steps:
258/// 1. For each ensemble, resolve `EnsembleActorTemplate` (from existing or fresh).
259/// 2. For each new component fragment, allocate actor IDs and compute worker/vnode assignments.
260/// 3. Expand the simple assignments into full `StreamActor` structures.
261fn render_actors(
262    fragments: &StreamJobFragmentsToCreate,
263    database_info: &InflightDatabaseInfo,
264    definition: &str,
265    ctx: &StreamContext,
266    streaming_job_model: &streaming_job::Model,
267    actor_id_counter: &AtomicU32,
268    worker_map: &HashMap<WorkerId, WorkerNode>,
269    adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
270    ensembles: &[NoShuffleEnsemble],
271    database_resource_group: &str,
272) -> MetaResult<RenderResult> {
273    // Step 2: Render actors for each ensemble.
274    // For each new fragment, produce a simple assignment: actor_id -> (worker_id, vnode_bitmap).
275    let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
276        HashMap::new();
277
278    for ensemble in ensembles {
279        // Determine the EnsembleActorTemplate for this ensemble.
280        //
281        // Check if any component fragment in the ensemble already exists (i.e. is inflight).
282        // If so, derive the actor assignment from an existing fragment. Otherwise render fresh.
283        let existing_fragment_ids: Vec<FragmentId> = ensemble
284            .component_fragments()
285            .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
286            .collect();
287
288        let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
289            let template = EnsembleActorTemplate::from_existing_inflight_fragment(
290                database_info.fragment(first_existing),
291            );
292
293            // Sanity check: all existing fragments in the same ensemble must be aligned —
294            // same actor count and same worker placement per vnode.
295            for &other_fragment_id in &existing_fragment_ids[1..] {
296                let other = EnsembleActorTemplate::from_existing_inflight_fragment(
297                    database_info.fragment(other_fragment_id),
298                );
299                template.assert_aligned_with(&other, first_existing, other_fragment_id);
300            }
301
302            template
303        } else {
304            // All fragments are new — render from scratch.
305            let first_component = ensemble
306                .component_fragments()
307                .next()
308                .expect("ensemble must have at least one component");
309            let fragment = &fragments.inner.fragments[&first_component];
310            let distribution_type: DistributionType = fragment.distribution_type.into();
311            let vnode_count = fragment.vnode_count();
312
313            // Assert all component fragments in this ensemble share the same vnode count.
314            for fragment_id in ensemble.component_fragments() {
315                let f = &fragments.inner.fragments[&fragment_id];
316                assert_eq!(
317                    vnode_count,
318                    f.vnode_count(),
319                    "component fragments {} and {} in the same no-shuffle ensemble have \
320                     different vnode counts: {} vs {}",
321                    first_component,
322                    fragment_id,
323                    vnode_count,
324                    f.vnode_count(),
325                );
326            }
327
328            EnsembleActorTemplate::render_new(
329                streaming_job_model,
330                worker_map,
331                adaptive_parallelism_strategy,
332                None,
333                database_resource_group.to_owned(),
334                distribution_type,
335                vnode_count,
336            )?
337        };
338
339        // Render each new component fragment in this ensemble.
340        for fragment_id in ensemble.component_fragments() {
341            if !fragments.inner.fragments.contains_key(&fragment_id) {
342                continue; // Skip existing fragments.
343            }
344            let fragment = &fragments.inner.fragments[&fragment_id];
345            let distribution_type: DistributionType = fragment.distribution_type.into();
346            let aligner =
347                ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
348            let assignments = aligner.align_component_actor(distribution_type);
349            actor_assignments.insert(fragment_id, assignments);
350        }
351    }
352
353    // Step 3: Expand simple assignments into full StreamActor structures.
354    let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
355    let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
356
357    for (fragment_id, assignments) in &actor_assignments {
358        let mut actors = Vec::with_capacity(assignments.len());
359        for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
360            result_actor_location.insert(actor_id, *worker_id);
361            actors.push(StreamActor {
362                actor_id,
363                fragment_id: *fragment_id,
364                vnode_bitmap: vnode_bitmap.clone(),
365                mview_definition: definition.to_owned(),
366                expr_context: Some(ctx.to_expr_context()),
367                config_override: ctx.config_override.clone(),
368            });
369        }
370        result_stream_actors.insert(*fragment_id, actors);
371    }
372
373    Ok(RenderResult {
374        stream_actors: result_stream_actors,
375        actor_location: result_actor_location,
376    })
377}
378impl DatabaseCheckpointControl {
379    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
380    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
381        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
382        let node_actors =
383            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
384        (table_ids_to_commit, node_actors)
385    }
386
387    /// Helper for the simplest command variants: those that only need a
388    /// pre-computed mutation and a command name, with no actors to create
389    /// and no additional side effects on `self`.
390    fn apply_simple_command(
391        &self,
392        mutation: Option<Mutation>,
393        command_name: &'static str,
394    ) -> ApplyCommandResult {
395        let (table_ids, node_actors) = self.collect_base_info();
396        (
397            mutation,
398            table_ids,
399            None,
400            node_actors,
401            PostCollectCommand::Command(command_name.to_owned()),
402        )
403    }
404
405    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
406    /// will be removed from the state after the info get resolved.
407    pub(super) fn apply_command(
408        &mut self,
409        command: Option<Command>,
410        notifiers: &mut Vec<Notifier>,
411        barrier_info: BarrierInfo,
412        partial_graph_manager: &mut PartialGraphManager,
413        hummock_version_stats: &HummockVersionStats,
414        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
415        worker_nodes: &HashMap<WorkerId, WorkerNode>,
416    ) -> MetaResult<ApplyCommandInfo> {
417        debug_assert!(
418            !matches!(
419                command,
420                Some(Command::RescheduleIntent {
421                    reschedule_plan: None,
422                    ..
423                })
424            ),
425            "reschedule intent must be resolved before apply"
426        );
427        if matches!(
428            command,
429            Some(Command::RescheduleIntent {
430                reschedule_plan: None,
431                ..
432            })
433        ) {
434            bail!("reschedule intent must be resolved before apply");
435        }
436
437        /// Resolve source splits for a create streaming job command.
438        ///
439        /// Combines source fragment split resolution and backfill split alignment
440        /// into one step, looking up existing upstream actor splits from the inflight database info.
441        fn resolve_source_splits(
442            info: &CreateStreamingJobCommandInfo,
443            render_result: &RenderResult,
444            actor_no_shuffle: &ActorNewNoShuffle,
445            database_info: &InflightDatabaseInfo,
446        ) -> MetaResult<SplitAssignment> {
447            let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
448                .stream_actors
449                .iter()
450                .map(|(fragment_id, actors)| {
451                    (
452                        *fragment_id,
453                        actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
454                    )
455                })
456                .collect();
457            let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
458                &info.stream_job_fragments,
459                &info.init_split_assignment,
460                &fragment_actor_ids,
461            )?;
462            resolved.extend(SourceManager::resolve_backfill_splits(
463                &info.stream_job_fragments,
464                actor_no_shuffle,
465                |fragment_id, actor_id| {
466                    database_info
467                        .fragment(fragment_id)
468                        .actors
469                        .get(&actor_id)
470                        .map(|info| info.splits.clone())
471                },
472            )?);
473            Ok(resolved)
474        }
475
476        // Throttle data for creating jobs (set only in the Throttle arm)
477        let mut throttle_for_creating_jobs: Option<(
478            HashSet<JobId>,
479            HashMap<FragmentId, ThrottleConfig>,
480        )> = None;
481
482        // Each variant handles its own pre-apply, edge building, mutation generation,
483        // collect base info, and post-apply. The match produces values consumed by the
484        // common snapshot-backfill-merging code that follows.
485        let (
486            mutation,
487            mut table_ids_to_commit,
488            mut actors_to_create,
489            mut node_actors,
490            post_collect_command,
491        ) = match command {
492            None => self.apply_simple_command(None, "barrier"),
493            Some(Command::CreateStreamingJob {
494                mut info,
495                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
496                cross_db_snapshot_backfill_info,
497            }) => {
498                let ensembles = resolve_no_shuffle_ensembles(
499                    &info.stream_job_fragments,
500                    &info.upstream_fragment_downstreams,
501                )?;
502                let actors = render_actors(
503                    &info.stream_job_fragments,
504                    &self.database_info,
505                    &info.definition,
506                    &info.stream_job_fragments.inner.ctx,
507                    &info.streaming_job_model,
508                    partial_graph_manager
509                        .control_stream_manager()
510                        .env
511                        .actor_id_generator(),
512                    worker_nodes,
513                    adaptive_parallelism_strategy,
514                    &ensembles,
515                    &info.database_resource_group,
516                )?;
517                {
518                    assert!(!self.state.is_paused());
519                    let snapshot_epoch = barrier_info.prev_epoch();
520                    // set snapshot epoch of upstream table for snapshot backfill
521                    for snapshot_backfill_epoch in snapshot_backfill_info
522                        .upstream_mv_table_id_to_backfill_epoch
523                        .values_mut()
524                    {
525                        assert_eq!(
526                            snapshot_backfill_epoch.replace(snapshot_epoch),
527                            None,
528                            "must not set previously"
529                        );
530                    }
531                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
532                        fill_snapshot_backfill_epoch(
533                            &mut fragment.nodes,
534                            Some(&snapshot_backfill_info),
535                            &cross_db_snapshot_backfill_info,
536                        )?;
537                    }
538                    let job_id = info.stream_job_fragments.stream_job_id();
539                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
540                        .upstream_mv_table_id_to_backfill_epoch
541                        .keys()
542                        .cloned()
543                        .collect();
544
545                    // Build edges first (needed for no-shuffle mapping used in split resolution)
546                    let mut edges = self.database_info.build_edge(
547                        Some((&info, true)),
548                        None,
549                        None,
550                        partial_graph_manager.control_stream_manager(),
551                        &actors.stream_actors,
552                        &actors.actor_location,
553                    );
554                    // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
555                    let resolved_split_assignment = resolve_source_splits(
556                        &info,
557                        &actors,
558                        edges.actor_new_no_shuffle(),
559                        &self.database_info,
560                    )?;
561
562                    let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
563                    else {
564                        panic!("duplicated creating snapshot backfill job {job_id}");
565                    };
566
567                    let job = CreatingStreamingJobControl::new(
568                        entry,
569                        CreateSnapshotBackfillJobCommandInfo {
570                            info: info.clone(),
571                            snapshot_backfill_info: snapshot_backfill_info.clone(),
572                            cross_db_snapshot_backfill_info,
573                            resolved_split_assignment: resolved_split_assignment.clone(),
574                        },
575                        take(notifiers),
576                        snapshot_backfill_upstream_tables,
577                        snapshot_epoch,
578                        hummock_version_stats,
579                        partial_graph_manager,
580                        &mut edges,
581                        &resolved_split_assignment,
582                        &actors,
583                    )?;
584
585                    self.database_info
586                        .shared_actor_infos
587                        .upsert(self.database_id, job.fragment_infos_with_job_id());
588
589                    for upstream_mv_table_id in snapshot_backfill_info
590                        .upstream_mv_table_id_to_backfill_epoch
591                        .keys()
592                    {
593                        self.database_info.register_subscriber(
594                            upstream_mv_table_id.as_job_id(),
595                            info.streaming_job.id().as_subscriber_id(),
596                            SubscriberType::SnapshotBackfill,
597                        );
598                    }
599
600                    let mutation = Command::create_streaming_job_to_mutation(
601                        &info,
602                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
603                        self.state.is_paused(),
604                        &mut edges,
605                        partial_graph_manager.control_stream_manager(),
606                        None,
607                        &resolved_split_assignment,
608                        &actors.stream_actors,
609                        &actors.actor_location,
610                    )?;
611
612                    let (table_ids, node_actors) = self.collect_base_info();
613                    (
614                        Some(mutation),
615                        table_ids,
616                        None,
617                        node_actors,
618                        PostCollectCommand::barrier(),
619                    )
620                }
621            }
622            Some(Command::CreateStreamingJob {
623                mut info,
624                job_type,
625                cross_db_snapshot_backfill_info,
626            }) => {
627                let ensembles = resolve_no_shuffle_ensembles(
628                    &info.stream_job_fragments,
629                    &info.upstream_fragment_downstreams,
630                )?;
631                let actors = render_actors(
632                    &info.stream_job_fragments,
633                    &self.database_info,
634                    &info.definition,
635                    &info.stream_job_fragments.inner.ctx,
636                    &info.streaming_job_model,
637                    partial_graph_manager
638                        .control_stream_manager()
639                        .env
640                        .actor_id_generator(),
641                    worker_nodes,
642                    adaptive_parallelism_strategy,
643                    &ensembles,
644                    &info.database_resource_group,
645                )?;
646                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
647                    fill_snapshot_backfill_epoch(
648                        &mut fragment.nodes,
649                        None,
650                        &cross_db_snapshot_backfill_info,
651                    )?;
652                }
653
654                // Build edges
655                let new_upstream_sink =
656                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
657                        Some(ctx)
658                    } else {
659                        None
660                    };
661
662                let mut edges = self.database_info.build_edge(
663                    Some((&info, false)),
664                    None,
665                    new_upstream_sink,
666                    partial_graph_manager.control_stream_manager(),
667                    &actors.stream_actors,
668                    &actors.actor_location,
669                );
670                // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
671                let resolved_split_assignment = resolve_source_splits(
672                    &info,
673                    &actors,
674                    edges.actor_new_no_shuffle(),
675                    &self.database_info,
676                )?;
677
678                // Pre-apply: add new job and fragments
679                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
680                    let (fragment, _) =
681                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
682                            .expect("should have parallel cdc fragment");
683                    Some(CdcTableBackfillTracker::new(
684                        fragment.fragment_id,
685                        splits.clone(),
686                    ))
687                } else {
688                    None
689                };
690                self.database_info
691                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
692                self.database_info.pre_apply_new_fragments(
693                    info.stream_job_fragments
694                        .new_fragment_info(
695                            &actors.stream_actors,
696                            &actors.actor_location,
697                            &resolved_split_assignment,
698                        )
699                        .map(|(fragment_id, fragment_infos)| {
700                            (fragment_id, info.streaming_job.id(), fragment_infos)
701                        }),
702                );
703                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
704                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
705                    self.database_info.pre_apply_add_node_upstream(
706                        downstream_fragment_id,
707                        &PbUpstreamSinkInfo {
708                            upstream_fragment_id: ctx.sink_fragment_id,
709                            sink_output_schema: ctx.sink_output_fields.clone(),
710                            project_exprs: ctx.project_exprs.clone(),
711                        },
712                    );
713                }
714
715                let (table_ids, node_actors) = self.collect_base_info();
716
717                // Actors to create
718                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
719                    &info,
720                    &mut edges,
721                    &actors.stream_actors,
722                    &actors.actor_location,
723                ));
724
725                // CDC table snapshot splits
726                let actor_cdc_table_snapshot_splits = self
727                    .database_info
728                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
729
730                // Mutation
731                let is_currently_paused = self.state.is_paused();
732                let mutation = Command::create_streaming_job_to_mutation(
733                    &info,
734                    &job_type,
735                    is_currently_paused,
736                    &mut edges,
737                    partial_graph_manager.control_stream_manager(),
738                    actor_cdc_table_snapshot_splits,
739                    &resolved_split_assignment,
740                    &actors.stream_actors,
741                    &actors.actor_location,
742                )?;
743
744                (
745                    Some(mutation),
746                    table_ids,
747                    actors_to_create,
748                    node_actors,
749                    PostCollectCommand::CreateStreamingJob {
750                        info,
751                        job_type,
752                        cross_db_snapshot_backfill_info,
753                        resolved_split_assignment,
754                    },
755                )
756            }
757
758            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
759
760            Some(Command::Pause) => {
761                let prev_is_paused = self.state.is_paused();
762                self.state.set_is_paused(true);
763                let mutation = Command::pause_to_mutation(prev_is_paused);
764                let (table_ids, node_actors) = self.collect_base_info();
765                (
766                    mutation,
767                    table_ids,
768                    None,
769                    node_actors,
770                    PostCollectCommand::Command("Pause".to_owned()),
771                )
772            }
773
774            Some(Command::Resume) => {
775                let prev_is_paused = self.state.is_paused();
776                self.state.set_is_paused(false);
777                let mutation = Command::resume_to_mutation(prev_is_paused);
778                let (table_ids, node_actors) = self.collect_base_info();
779                (
780                    mutation,
781                    table_ids,
782                    None,
783                    node_actors,
784                    PostCollectCommand::Command("Resume".to_owned()),
785                )
786            }
787
788            Some(Command::Throttle { jobs, config }) => {
789                let mutation = Some(Command::throttle_to_mutation(&config));
790                throttle_for_creating_jobs = Some((jobs, config));
791                self.apply_simple_command(mutation, "Throttle")
792            }
793
794            Some(Command::DropStreamingJobs {
795                streaming_job_ids,
796                unregistered_state_table_ids,
797                unregistered_fragment_ids,
798                dropped_sink_fragment_by_targets,
799            }) => {
800                let actors = self
801                    .database_info
802                    .fragment_infos()
803                    .filter(|fragment| {
804                        self.database_info
805                            .job_id_by_fragment(fragment.fragment_id)
806                            .is_some_and(|job_id| streaming_job_ids.contains(&job_id))
807                    })
808                    .flat_map(|fragment| fragment.actors.keys().copied())
809                    .collect::<Vec<_>>();
810
811                // pre_apply: drop node upstream for sink targets
812                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
813                    self.database_info
814                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
815                }
816
817                let (table_ids, node_actors) = self.collect_base_info();
818
819                // post_apply: remove fragments
820                self.database_info
821                    .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
822
823                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
824                    &actors,
825                    &dropped_sink_fragment_by_targets,
826                ));
827                (
828                    mutation,
829                    table_ids,
830                    None,
831                    node_actors,
832                    PostCollectCommand::DropStreamingJobs {
833                        streaming_job_ids,
834                        unregistered_state_table_ids,
835                    },
836                )
837            }
838
839            Some(Command::RescheduleIntent {
840                reschedule_plan, ..
841            }) => {
842                let ReschedulePlan {
843                    reschedules,
844                    fragment_actors,
845                } = reschedule_plan
846                    .as_ref()
847                    .expect("reschedule intent should be resolved in global barrier worker");
848
849                // Pre-apply: reschedule fragments
850                for (fragment_id, reschedule) in reschedules {
851                    self.database_info.pre_apply_reschedule(
852                        *fragment_id,
853                        reschedule
854                            .added_actors
855                            .iter()
856                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
857                                actors.iter().map(|actor_id| {
858                                    (
859                                        *actor_id,
860                                        InflightActorInfo {
861                                            worker_id: *node_id,
862                                            vnode_bitmap: reschedule
863                                                .newly_created_actors
864                                                .get(actor_id)
865                                                .expect("should exist")
866                                                .0
867                                                .0
868                                                .vnode_bitmap
869                                                .clone(),
870                                            splits: reschedule
871                                                .actor_splits
872                                                .get(actor_id)
873                                                .cloned()
874                                                .unwrap_or_default(),
875                                        },
876                                    )
877                                })
878                            })
879                            .collect(),
880                        reschedule
881                            .vnode_bitmap_updates
882                            .iter()
883                            .filter(|(actor_id, _)| {
884                                !reschedule.newly_created_actors.contains_key(*actor_id)
885                            })
886                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
887                            .collect(),
888                        reschedule.actor_splits.clone(),
889                    );
890                }
891
892                let (table_ids, node_actors) = self.collect_base_info();
893
894                // Actors to create
895                let actors_to_create = Some(Command::reschedule_actors_to_create(
896                    reschedules,
897                    fragment_actors,
898                    &self.database_info,
899                    partial_graph_manager.control_stream_manager(),
900                ));
901
902                // Post-apply: remove old actors
903                self.database_info
904                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
905                        (
906                            *fragment_id,
907                            reschedule.removed_actors.iter().cloned().collect(),
908                        )
909                    }));
910
911                // Mutation
912                let mutation = Command::reschedule_to_mutation(
913                    reschedules,
914                    fragment_actors,
915                    partial_graph_manager.control_stream_manager(),
916                    &mut self.database_info,
917                )?;
918
919                let reschedules = reschedule_plan
920                    .expect("reschedule intent should be resolved in global barrier worker")
921                    .reschedules;
922                (
923                    mutation,
924                    table_ids,
925                    actors_to_create,
926                    node_actors,
927                    PostCollectCommand::Reschedule { reschedules },
928                )
929            }
930
931            Some(Command::ReplaceStreamJob(plan)) => {
932                let ensembles = resolve_no_shuffle_ensembles(
933                    &plan.new_fragments,
934                    &plan.upstream_fragment_downstreams,
935                )?;
936                let mut render_result = render_actors(
937                    &plan.new_fragments,
938                    &self.database_info,
939                    "", // replace jobs don't need mview definition
940                    &plan.new_fragments.inner.ctx,
941                    &plan.streaming_job_model,
942                    partial_graph_manager
943                        .control_stream_manager()
944                        .env
945                        .actor_id_generator(),
946                    worker_nodes,
947                    adaptive_parallelism_strategy,
948                    &ensembles,
949                    &plan.database_resource_group,
950                )?;
951
952                // Render actors for auto_refresh_schema_sinks.
953                // Each sink's new_fragment inherits parallelism from its original_fragment.
954                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
955                    let actor_id_counter = partial_graph_manager
956                        .control_stream_manager()
957                        .env
958                        .actor_id_generator();
959                    for sink_ctx in sinks {
960                        let original_fragment_id = sink_ctx.original_fragment.fragment_id;
961                        let original_frag_info = self.database_info.fragment(original_fragment_id);
962                        let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
963                            original_frag_info,
964                        );
965                        let new_aligner = ComponentFragmentAligner::new_persistent(
966                            &actor_template,
967                            actor_id_counter,
968                        );
969                        let distribution_type: DistributionType =
970                            sink_ctx.new_fragment.distribution_type.into();
971                        let actor_assignments =
972                            new_aligner.align_component_actor(distribution_type);
973                        let new_fragment_id = sink_ctx.new_fragment.fragment_id;
974                        let mut actors = Vec::with_capacity(actor_assignments.len());
975                        for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
976                            render_result.actor_location.insert(actor_id, *worker_id);
977                            actors.push(StreamActor {
978                                actor_id,
979                                fragment_id: new_fragment_id,
980                                vnode_bitmap: vnode_bitmap.clone(),
981                                mview_definition: String::new(),
982                                expr_context: Some(sink_ctx.ctx.to_expr_context()),
983                                config_override: sink_ctx.ctx.config_override.clone(),
984                            });
985                        }
986                        render_result.stream_actors.insert(new_fragment_id, actors);
987                    }
988                }
989
990                // Build edges first (needed for no-shuffle mapping used in split resolution)
991                let mut edges = self.database_info.build_edge(
992                    None,
993                    Some(&plan),
994                    None,
995                    partial_graph_manager.control_stream_manager(),
996                    &render_result.stream_actors,
997                    &render_result.actor_location,
998                );
999
1000                // Phase 2: Resolve splits to actor-level assignment.
1001                let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1002                    .stream_actors
1003                    .iter()
1004                    .map(|(fragment_id, actors)| {
1005                        (
1006                            *fragment_id,
1007                            actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1008                        )
1009                    })
1010                    .collect();
1011                let resolved_split_assignment = match &plan.split_plan {
1012                    ReplaceJobSplitPlan::Discovered(discovered) => {
1013                        SourceManager::resolve_fragment_to_actor_splits(
1014                            &plan.new_fragments,
1015                            discovered,
1016                            &fragment_actor_ids,
1017                        )?
1018                    }
1019                    ReplaceJobSplitPlan::AlignFromPrevious => {
1020                        SourceManager::resolve_replace_source_splits(
1021                            &plan.new_fragments,
1022                            &plan.replace_upstream,
1023                            edges.actor_new_no_shuffle(),
1024                            |_fragment_id, actor_id| {
1025                                self.database_info.fragment_infos().find_map(|fragment| {
1026                                    fragment
1027                                        .actors
1028                                        .get(&actor_id)
1029                                        .map(|info| info.splits.clone())
1030                                })
1031                            },
1032                        )?
1033                    }
1034                };
1035
1036                // Pre-apply: add new fragments and replace upstream
1037                self.database_info.pre_apply_new_fragments(
1038                    plan.new_fragments
1039                        .new_fragment_info(
1040                            &render_result.stream_actors,
1041                            &render_result.actor_location,
1042                            &resolved_split_assignment,
1043                        )
1044                        .map(|(fragment_id, new_fragment)| {
1045                            (fragment_id, plan.streaming_job.id(), new_fragment)
1046                        }),
1047                );
1048                for (fragment_id, replace_map) in &plan.replace_upstream {
1049                    self.database_info
1050                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1051                }
1052                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1053                    self.database_info
1054                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
1055                            (
1056                                sink.new_fragment.fragment_id,
1057                                sink.original_sink.id.as_job_id(),
1058                                sink.new_fragment_info(
1059                                    &render_result.stream_actors,
1060                                    &render_result.actor_location,
1061                                ),
1062                            )
1063                        }));
1064                }
1065
1066                let (table_ids, node_actors) = self.collect_base_info();
1067
1068                // Actors to create
1069                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1070                    &plan,
1071                    &mut edges,
1072                    &self.database_info,
1073                    &render_result.stream_actors,
1074                    &render_result.actor_location,
1075                ));
1076
1077                // Mutation (must be generated before removing old fragments,
1078                // because it reads actor info from database_info)
1079                let mutation = Command::replace_stream_job_to_mutation(
1080                    &plan,
1081                    &mut edges,
1082                    &mut self.database_info,
1083                    &resolved_split_assignment,
1084                )?;
1085
1086                // Post-apply: remove old fragments
1087                {
1088                    let mut fragment_ids_to_remove: Vec<_> = plan
1089                        .old_fragments
1090                        .fragments
1091                        .values()
1092                        .map(|f| f.fragment_id)
1093                        .collect();
1094                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1095                        fragment_ids_to_remove
1096                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1097                    }
1098                    self.database_info
1099                        .post_apply_remove_fragments(fragment_ids_to_remove);
1100                }
1101
1102                (
1103                    mutation,
1104                    table_ids,
1105                    actors_to_create,
1106                    node_actors,
1107                    PostCollectCommand::ReplaceStreamJob {
1108                        plan,
1109                        resolved_split_assignment,
1110                    },
1111                )
1112            }
1113
1114            Some(Command::SourceChangeSplit(split_state)) => {
1115                // Pre-apply: split assignments
1116                self.database_info.pre_apply_split_assignments(
1117                    split_state
1118                        .split_assignment
1119                        .iter()
1120                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1121                );
1122
1123                let mutation = Some(Command::source_change_split_to_mutation(
1124                    &split_state.split_assignment,
1125                ));
1126                let (table_ids, node_actors) = self.collect_base_info();
1127                (
1128                    mutation,
1129                    table_ids,
1130                    None,
1131                    node_actors,
1132                    PostCollectCommand::SourceChangeSplit {
1133                        split_assignment: split_state.split_assignment,
1134                    },
1135                )
1136            }
1137
1138            Some(Command::CreateSubscription {
1139                subscription_id,
1140                upstream_mv_table_id,
1141                retention_second,
1142            }) => {
1143                self.database_info.register_subscriber(
1144                    upstream_mv_table_id.as_job_id(),
1145                    subscription_id.as_subscriber_id(),
1146                    SubscriberType::Subscription(retention_second),
1147                );
1148                let mutation = Some(Command::create_subscription_to_mutation(
1149                    upstream_mv_table_id,
1150                    subscription_id,
1151                ));
1152                let (table_ids, node_actors) = self.collect_base_info();
1153                (
1154                    mutation,
1155                    table_ids,
1156                    None,
1157                    node_actors,
1158                    PostCollectCommand::CreateSubscription { subscription_id },
1159                )
1160            }
1161
1162            Some(Command::DropSubscription {
1163                subscription_id,
1164                upstream_mv_table_id,
1165            }) => {
1166                if self
1167                    .database_info
1168                    .unregister_subscriber(
1169                        upstream_mv_table_id.as_job_id(),
1170                        subscription_id.as_subscriber_id(),
1171                    )
1172                    .is_none()
1173                {
1174                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1175                }
1176                let mutation = Some(Command::drop_subscription_to_mutation(
1177                    upstream_mv_table_id,
1178                    subscription_id,
1179                ));
1180                let (table_ids, node_actors) = self.collect_base_info();
1181                (
1182                    mutation,
1183                    table_ids,
1184                    None,
1185                    node_actors,
1186                    PostCollectCommand::Command("DropSubscription".to_owned()),
1187                )
1188            }
1189
1190            Some(Command::AlterSubscriptionRetention {
1191                subscription_id,
1192                upstream_mv_table_id,
1193                retention_second,
1194            }) => {
1195                self.database_info.update_subscription_retention(
1196                    upstream_mv_table_id.as_job_id(),
1197                    subscription_id.as_subscriber_id(),
1198                    retention_second,
1199                );
1200                self.apply_simple_command(None, "AlterSubscriptionRetention")
1201            }
1202
1203            Some(Command::ConnectorPropsChange(config)) => {
1204                let mutation = Some(Command::connector_props_change_to_mutation(&config));
1205                let (table_ids, node_actors) = self.collect_base_info();
1206                (
1207                    mutation,
1208                    table_ids,
1209                    None,
1210                    node_actors,
1211                    PostCollectCommand::ConnectorPropsChange(config),
1212                )
1213            }
1214
1215            Some(Command::Refresh {
1216                table_id,
1217                associated_source_id,
1218            }) => {
1219                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1220                self.apply_simple_command(mutation, "Refresh")
1221            }
1222
1223            Some(Command::ListFinish {
1224                table_id: _,
1225                associated_source_id,
1226            }) => {
1227                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1228                self.apply_simple_command(mutation, "ListFinish")
1229            }
1230
1231            Some(Command::LoadFinish {
1232                table_id: _,
1233                associated_source_id,
1234            }) => {
1235                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1236                self.apply_simple_command(mutation, "LoadFinish")
1237            }
1238
1239            Some(Command::ResetSource { source_id }) => {
1240                let mutation = Some(Command::reset_source_to_mutation(source_id));
1241                self.apply_simple_command(mutation, "ResetSource")
1242            }
1243
1244            Some(Command::ResumeBackfill { target }) => {
1245                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1246                let (table_ids, node_actors) = self.collect_base_info();
1247                (
1248                    mutation,
1249                    table_ids,
1250                    None,
1251                    node_actors,
1252                    PostCollectCommand::ResumeBackfill { target },
1253                )
1254            }
1255
1256            Some(Command::InjectSourceOffsets {
1257                source_id,
1258                split_offsets,
1259            }) => {
1260                let mutation = Some(Command::inject_source_offsets_to_mutation(
1261                    source_id,
1262                    &split_offsets,
1263                ));
1264                self.apply_simple_command(mutation, "InjectSourceOffsets")
1265            }
1266        };
1267
1268        let mut finished_snapshot_backfill_jobs = HashSet::new();
1269        let mutation = match mutation {
1270            Some(mutation) => Some(mutation),
1271            None => {
1272                let mut finished_snapshot_backfill_job_info = HashMap::new();
1273                if barrier_info.kind.is_checkpoint() {
1274                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
1275                        if creating_job.should_merge_to_upstream() {
1276                            let info = creating_job
1277                                .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1278                            finished_snapshot_backfill_job_info
1279                                .try_insert(job_id, info)
1280                                .expect("non-duplicated");
1281                        }
1282                    }
1283                }
1284
1285                if !finished_snapshot_backfill_job_info.is_empty() {
1286                    let actors_to_create = actors_to_create.get_or_insert_default();
1287                    let mut subscriptions_to_drop = vec![];
1288                    let mut dispatcher_update = vec![];
1289                    let mut actor_splits = HashMap::new();
1290                    for (job_id, info) in finished_snapshot_backfill_job_info {
1291                        finished_snapshot_backfill_jobs.insert(job_id);
1292                        subscriptions_to_drop.extend(
1293                            info.snapshot_backfill_upstream_tables.iter().map(
1294                                |upstream_table_id| PbSubscriptionUpstreamInfo {
1295                                    subscriber_id: job_id.as_subscriber_id(),
1296                                    upstream_mv_table_id: *upstream_table_id,
1297                                },
1298                            ),
1299                        );
1300                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1301                            assert_matches!(
1302                                self.database_info.unregister_subscriber(
1303                                    upstream_mv_table_id.as_job_id(),
1304                                    job_id.as_subscriber_id()
1305                                ),
1306                                Some(SubscriberType::SnapshotBackfill)
1307                            );
1308                        }
1309
1310                        table_ids_to_commit.extend(
1311                            info.fragment_infos
1312                                .values()
1313                                .flat_map(|fragment| fragment.state_table_ids.iter())
1314                                .copied(),
1315                        );
1316
1317                        let actor_len = info
1318                            .fragment_infos
1319                            .values()
1320                            .map(|fragment| fragment.actors.len() as u64)
1321                            .sum();
1322                        let id_gen = GlobalActorIdGen::new(
1323                            partial_graph_manager
1324                                .control_stream_manager()
1325                                .env
1326                                .actor_id_generator(),
1327                            actor_len,
1328                        );
1329                        let mut next_local_actor_id = 0;
1330                        // mapping from old_actor_id to new_actor_id
1331                        let actor_mapping: HashMap<_, _> = info
1332                            .fragment_infos
1333                            .values()
1334                            .flat_map(|fragment| fragment.actors.keys())
1335                            .map(|old_actor_id| {
1336                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1337                                next_local_actor_id += 1;
1338                                (*old_actor_id, new_actor_id.as_global_id())
1339                            })
1340                            .collect();
1341                        let actor_mapping = &actor_mapping;
1342                        let new_stream_actors: HashMap<_, _> = info
1343                            .stream_actors
1344                            .into_iter()
1345                            .map(|(old_actor_id, mut actor)| {
1346                                let new_actor_id = actor_mapping[&old_actor_id];
1347                                actor.actor_id = new_actor_id;
1348                                (new_actor_id, actor)
1349                            })
1350                            .collect();
1351                        let new_fragment_info: HashMap<_, _> = info
1352                            .fragment_infos
1353                            .into_iter()
1354                            .map(|(fragment_id, mut fragment)| {
1355                                let actors = take(&mut fragment.actors);
1356                                fragment.actors = actors
1357                                    .into_iter()
1358                                    .map(|(old_actor_id, actor)| {
1359                                        let new_actor_id = actor_mapping[&old_actor_id];
1360                                        (new_actor_id, actor)
1361                                    })
1362                                    .collect();
1363                                (fragment_id, fragment)
1364                            })
1365                            .collect();
1366                        actor_splits.extend(
1367                            new_fragment_info
1368                                .values()
1369                                .flat_map(|fragment| &fragment.actors)
1370                                .map(|(actor_id, actor)| {
1371                                    (
1372                                        *actor_id,
1373                                        ConnectorSplits {
1374                                            splits: actor
1375                                                .splits
1376                                                .iter()
1377                                                .map(ConnectorSplit::from)
1378                                                .collect(),
1379                                        },
1380                                    )
1381                                }),
1382                        );
1383                        // new actors belong to the database partial graph
1384                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
1385                        let mut edge_builder = FragmentEdgeBuilder::new(
1386                            info.upstream_fragment_downstreams
1387                                .keys()
1388                                .map(|upstream_fragment_id| {
1389                                    self.database_info.fragment(*upstream_fragment_id)
1390                                })
1391                                .chain(new_fragment_info.values())
1392                                .map(|fragment| {
1393                                    (
1394                                        fragment.fragment_id,
1395                                        EdgeBuilderFragmentInfo::from_inflight(
1396                                            fragment,
1397                                            partial_graph_id,
1398                                            partial_graph_manager.control_stream_manager(),
1399                                        ),
1400                                    )
1401                                }),
1402                        );
1403                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
1404                        edge_builder.add_relations(&info.downstreams);
1405                        let mut edges = edge_builder.build();
1406                        let new_actors_to_create = edges.collect_actors_to_create(
1407                            new_fragment_info.values().map(|fragment| {
1408                                (
1409                                    fragment.fragment_id,
1410                                    &fragment.nodes,
1411                                    fragment.actors.iter().map(|(actor_id, actor)| {
1412                                        (&new_stream_actors[actor_id], actor.worker_id)
1413                                    }),
1414                                    [], // no initial subscriber for backfilling job
1415                                )
1416                            }),
1417                        );
1418                        dispatcher_update.extend(
1419                            info.upstream_fragment_downstreams.keys().flat_map(
1420                                |upstream_fragment_id| {
1421                                    let new_actor_dispatchers = edges
1422                                        .dispatchers
1423                                        .remove(upstream_fragment_id)
1424                                        .expect("should exist");
1425                                    new_actor_dispatchers.into_iter().flat_map(
1426                                        |(upstream_actor_id, dispatchers)| {
1427                                            dispatchers.into_iter().map(move |dispatcher| {
1428                                                PbDispatcherUpdate {
1429                                                    actor_id: upstream_actor_id,
1430                                                    dispatcher_id: dispatcher.dispatcher_id,
1431                                                    hash_mapping: dispatcher.hash_mapping,
1432                                                    removed_downstream_actor_id: dispatcher
1433                                                        .downstream_actor_id
1434                                                        .iter()
1435                                                        .map(|new_downstream_actor_id| {
1436                                                            actor_mapping
1437                                                            .iter()
1438                                                            .find_map(
1439                                                                |(old_actor_id, new_actor_id)| {
1440                                                                    (new_downstream_actor_id
1441                                                                        == new_actor_id)
1442                                                                        .then_some(*old_actor_id)
1443                                                                },
1444                                                            )
1445                                                            .expect("should exist")
1446                                                        })
1447                                                        .collect(),
1448                                                    added_downstream_actor_id: dispatcher
1449                                                        .downstream_actor_id,
1450                                                }
1451                                            })
1452                                        },
1453                                    )
1454                                },
1455                            ),
1456                        );
1457                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1458                        for (worker_id, worker_actors) in new_actors_to_create {
1459                            node_actors.entry(worker_id).or_default().extend(
1460                                worker_actors.values().flat_map(|(_, actors, _)| {
1461                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
1462                                }),
1463                            );
1464                            actors_to_create
1465                                .entry(worker_id)
1466                                .or_default()
1467                                .extend(worker_actors);
1468                        }
1469                        self.database_info.add_existing(InflightStreamingJobInfo {
1470                            job_id,
1471                            fragment_infos: new_fragment_info,
1472                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1473                            status: CreateStreamingJobStatus::Created,
1474                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1475                        });
1476                    }
1477
1478                    Some(PbMutation::Update(PbUpdateMutation {
1479                        dispatcher_update,
1480                        merge_update: vec![], // no upstream update on existing actors
1481                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1482                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1483                        actor_splits,
1484                        actor_new_dispatchers: Default::default(), // no new dispatcher
1485                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1486                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1487                        subscriptions_to_drop,
1488                    }))
1489                } else {
1490                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1491                    if fragment_ids.is_empty() {
1492                        None
1493                    } else {
1494                        Some(PbMutation::StartFragmentBackfill(
1495                            PbStartFragmentBackfillMutation { fragment_ids },
1496                        ))
1497                    }
1498                }
1499            }
1500        };
1501
1502        // Forward barrier to creating streaming job controls
1503        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1504            if !finished_snapshot_backfill_jobs.contains(job_id) {
1505                let throttle_mutation = if let Some((ref jobs, ref config)) =
1506                    throttle_for_creating_jobs
1507                    && jobs.contains(job_id)
1508                {
1509                    assert_eq!(
1510                        jobs.len(),
1511                        1,
1512                        "should not alter rate limit of snapshot backfill job with other jobs"
1513                    );
1514                    Some((
1515                        Mutation::Throttle(ThrottleMutation {
1516                            fragment_throttle: config
1517                                .iter()
1518                                .map(|(fragment_id, config)| (*fragment_id, *config))
1519                                .collect(),
1520                        }),
1521                        take(notifiers),
1522                    ))
1523                } else {
1524                    None
1525                };
1526                creating_job.on_new_upstream_barrier(
1527                    partial_graph_manager,
1528                    &barrier_info,
1529                    throttle_mutation,
1530                )?;
1531            }
1532        }
1533
1534        partial_graph_manager.inject_barrier(
1535            to_partial_graph_id(self.database_id, None),
1536            mutation,
1537            &node_actors,
1538            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1539            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1540            actors_to_create,
1541            PartialGraphBarrierInfo::new(
1542                post_collect_command,
1543                barrier_info,
1544                take(notifiers),
1545                table_ids_to_commit,
1546            ),
1547        )?;
1548
1549        Ok(ApplyCommandInfo {
1550            jobs_to_wait: finished_snapshot_backfill_jobs,
1551        })
1552    }
1553}