Skip to main content

risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19use std::sync::atomic::AtomicU32;
20
21use risingwave_common::bail;
22use risingwave_common::bitmap::Bitmap;
23use risingwave_common::catalog::TableId;
24use risingwave_common::hash::VnodeCountCompat;
25use risingwave_common::id::JobId;
26use risingwave_common::util::epoch::Epoch;
27use risingwave_meta_model::fragment::DistributionType;
28use risingwave_meta_model::{DispatcherType, WorkerId, streaming_job};
29use risingwave_pb::common::WorkerNode;
30use risingwave_pb::hummock::HummockVersionStats;
31use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
32use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
33use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
34use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
35use risingwave_pb::stream_plan::{
36    AddMutation, PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
37    PbUpstreamSinkInfo, ThrottleMutation,
38};
39use tracing::warn;
40
41use crate::MetaResult;
42use crate::barrier::cdc_progress::CdcTableBackfillTracker;
43use crate::barrier::checkpoint::{
44    BatchRefreshJobCheckpointControl, BatchRefreshLogicalFragments, CreatingStreamingJobControl,
45    DatabaseCheckpointControl, IndependentCheckpointJobControl,
46};
47use crate::barrier::command::{CreateStreamingJobCommandInfo, PostCollectCommand, ReschedulePlan};
48use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
49use crate::barrier::edge_builder::{EdgeBuilderFragmentInfo, FragmentEdgeBuilder};
50use crate::barrier::info::{
51    BarrierInfo, CreateStreamingJobStatus, InflightDatabaseInfo, InflightStreamingJobInfo,
52    SubscriberType,
53};
54use crate::barrier::notifier::Notifier;
55use crate::barrier::partial_graph::{PartialGraphBarrierInfo, PartialGraphManager};
56use crate::barrier::rpc::to_partial_graph_id;
57use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
58use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
59use crate::controller::scale::{
60    ComponentFragmentAligner, EnsembleActorTemplate, LoadedFragment, NoShuffleEnsemble,
61    build_no_shuffle_fragment_graph_edges, find_no_shuffle_graphs,
62};
63use crate::model::{
64    ActorId, ActorNewNoShuffle, FragmentDownstreamRelation, FragmentId, StreamActor, StreamContext,
65    StreamJobActorsToCreate, StreamJobFragmentsToCreate,
66};
67use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
68use crate::stream::{
69    GlobalActorIdGen, ReplaceJobSplitPlan, SourceManager, SplitAssignment,
70    fill_snapshot_backfill_epoch,
71};
72
73/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
74pub(in crate::barrier) struct BarrierWorkerState {
75    /// The last sent `prev_epoch`
76    ///
77    /// There's no need to persist this field. On recovery, we will restore this from the latest
78    /// committed snapshot in `HummockManager`.
79    in_flight_prev_epoch: TracedEpoch,
80
81    /// The `prev_epoch` of pending non checkpoint barriers
82    pending_non_checkpoint_barriers: Vec<u64>,
83
84    /// Whether the cluster is paused.
85    is_paused: bool,
86}
87
88impl BarrierWorkerState {
89    pub(super) fn new() -> Self {
90        Self {
91            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
92            pending_non_checkpoint_barriers: vec![],
93            is_paused: false,
94        }
95    }
96
97    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
98        Self {
99            in_flight_prev_epoch,
100            pending_non_checkpoint_barriers: vec![],
101            is_paused,
102        }
103    }
104
105    pub fn is_paused(&self) -> bool {
106        self.is_paused
107    }
108
109    fn set_is_paused(&mut self, is_paused: bool) {
110        if self.is_paused != is_paused {
111            tracing::info!(
112                currently_paused = self.is_paused,
113                newly_paused = is_paused,
114                "update paused state"
115            );
116            self.is_paused = is_paused;
117        }
118    }
119
120    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
121        &self.in_flight_prev_epoch
122    }
123
124    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
125    pub fn next_barrier_info(
126        &mut self,
127        is_checkpoint: bool,
128        curr_epoch: TracedEpoch,
129    ) -> BarrierInfo {
130        assert!(
131            self.in_flight_prev_epoch.value() < curr_epoch.value(),
132            "curr epoch regress. {} > {}",
133            self.in_flight_prev_epoch.value(),
134            curr_epoch.value()
135        );
136        let prev_epoch = self.in_flight_prev_epoch.clone();
137        self.in_flight_prev_epoch = curr_epoch.clone();
138        self.pending_non_checkpoint_barriers
139            .push(prev_epoch.value().0);
140        let kind = if is_checkpoint {
141            let epochs = take(&mut self.pending_non_checkpoint_barriers);
142            BarrierKind::Checkpoint(epochs)
143        } else {
144            BarrierKind::Barrier
145        };
146        BarrierInfo {
147            prev_epoch,
148            curr_epoch,
149            kind,
150        }
151    }
152}
153
154pub(super) struct ApplyCommandInfo {
155    pub jobs_to_wait: HashSet<JobId>,
156}
157
158/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
159/// node actors, and post-collect command.
160type ApplyCommandResult = (
161    Option<Mutation>,
162    HashSet<TableId>,
163    Option<StreamJobActorsToCreate>,
164    HashMap<WorkerId, HashSet<ActorId>>,
165    PostCollectCommand,
166);
167
168/// Result of actor rendering for a create/replace streaming job.
169pub(crate) struct RenderResult {
170    /// Rendered actors grouped by fragment.
171    pub stream_actors: HashMap<FragmentId, Vec<StreamActor>>,
172    /// Worker placement for each actor.
173    pub actor_location: HashMap<ActorId, WorkerId>,
174}
175
176/// Derive `NoShuffle` edges from fragment downstream relations and resolve ensembles.
177///
178/// This scans both the internal downstream relations (`fragments.downstreams`) and
179/// the cross-boundary upstream-to-new-fragment relations (`upstream_fragment_downstreams`)
180/// to find all `NoShuffle` edges. It then runs BFS to find connected components (ensembles)
181/// and categorizes them into:
182/// - Ensembles whose entry fragments include existing (non-new) fragments
183/// - Ensembles whose entry fragments are all newly created
184pub(crate) fn resolve_no_shuffle_ensembles(
185    fragments: &StreamJobFragmentsToCreate,
186    upstream_fragment_downstreams: &FragmentDownstreamRelation,
187) -> MetaResult<Vec<NoShuffleEnsemble>> {
188    // Derive FragmentNewNoShuffle from the two downstream relation maps.
189    let mut new_no_shuffle: HashMap<_, HashSet<_>> = HashMap::new();
190
191    // Internal edges (new → new) and edges from new → existing downstream (replace job).
192    for (upstream_fid, relations) in &fragments.downstreams {
193        for rel in relations {
194            if rel.dispatcher_type == DispatcherType::NoShuffle {
195                new_no_shuffle
196                    .entry(*upstream_fid)
197                    .or_default()
198                    .insert(rel.downstream_fragment_id);
199            }
200        }
201    }
202
203    // Cross-boundary edges: existing upstream → new downstream.
204    for (upstream_fid, relations) in upstream_fragment_downstreams {
205        for rel in relations {
206            if rel.dispatcher_type == DispatcherType::NoShuffle {
207                new_no_shuffle
208                    .entry(*upstream_fid)
209                    .or_default()
210                    .insert(rel.downstream_fragment_id);
211            }
212        }
213    }
214
215    let mut ensembles = if new_no_shuffle.is_empty() {
216        Vec::new()
217    } else {
218        // Flatten into directed edge pairs for BFS.
219        let no_shuffle_edges: Vec<(FragmentId, FragmentId)> = new_no_shuffle
220            .iter()
221            .flat_map(|(upstream_fid, downstream_fids)| {
222                downstream_fids
223                    .iter()
224                    .map(move |downstream_fid| (*upstream_fid, *downstream_fid))
225            })
226            .collect();
227
228        let all_fragment_ids: Vec<FragmentId> = no_shuffle_edges
229            .iter()
230            .flat_map(|(u, d)| [*u, *d])
231            .collect::<HashSet<_>>()
232            .into_iter()
233            .collect();
234
235        let (fwd, bwd) = build_no_shuffle_fragment_graph_edges(no_shuffle_edges);
236        find_no_shuffle_graphs(&all_fragment_ids, &fwd, &bwd)?
237    };
238
239    // Add standalone fragments (not covered by any ensemble) as single-fragment ensembles.
240    let covered: HashSet<FragmentId> = ensembles
241        .iter()
242        .flat_map(|e| e.component_fragments())
243        .collect();
244    for fragment_id in fragments.inner.fragments.keys() {
245        if !covered.contains(fragment_id) {
246            ensembles.push(NoShuffleEnsemble::singleton(*fragment_id));
247        }
248    }
249
250    Ok(ensembles)
251}
252
253/// Render actors for a create or replace streaming job.
254///
255/// This determines the parallelism for each no-shuffle ensemble (either from an existing
256/// inflight upstream or computed fresh), and produces `StreamActor` instances with worker
257/// placements and actor-level no-shuffle mappings.
258///
259/// The process follows three steps:
260/// 1. For each ensemble, resolve `EnsembleActorTemplate` (from existing or fresh).
261/// 2. For each new component fragment, allocate actor IDs and compute worker/vnode assignments.
262/// 3. Expand the simple assignments into full `StreamActor` structures.
263pub(super) fn render_actors(
264    fragments: &StreamJobFragmentsToCreate,
265    database_info: &InflightDatabaseInfo,
266    definition: &str,
267    ctx: &StreamContext,
268    streaming_job_model: &streaming_job::Model,
269    actor_id_counter: &AtomicU32,
270    worker_map: &HashMap<WorkerId, WorkerNode>,
271    ensembles: &[NoShuffleEnsemble],
272    database_resource_group: &str,
273) -> MetaResult<RenderResult> {
274    // Step 2: Render actors for each ensemble.
275    // For each new fragment, produce a simple assignment: actor_id -> (worker_id, vnode_bitmap).
276    let mut actor_assignments: HashMap<FragmentId, HashMap<ActorId, (WorkerId, Option<Bitmap>)>> =
277        HashMap::new();
278
279    for ensemble in ensembles {
280        // Determine the EnsembleActorTemplate for this ensemble.
281        //
282        // Check if any component fragment in the ensemble already exists (i.e. is inflight).
283        // If so, derive the actor assignment from an existing fragment. Otherwise render fresh.
284        let existing_fragment_ids: Vec<FragmentId> = ensemble
285            .component_fragments()
286            .filter(|fragment_id| !fragments.inner.fragments.contains_key(fragment_id))
287            .collect();
288
289        let actor_template = if let Some(&first_existing) = existing_fragment_ids.first() {
290            let template = EnsembleActorTemplate::from_existing_inflight_fragment(
291                database_info.fragment(first_existing),
292            );
293
294            // Sanity check: all existing fragments in the same ensemble must be aligned —
295            // same actor count and same worker placement per vnode.
296            for &other_fragment_id in &existing_fragment_ids[1..] {
297                let other = EnsembleActorTemplate::from_existing_inflight_fragment(
298                    database_info.fragment(other_fragment_id),
299                );
300                template.assert_aligned_with(&other, first_existing, other_fragment_id);
301            }
302
303            template
304        } else {
305            // All fragments are new — render from scratch.
306            let first_component = ensemble
307                .component_fragments()
308                .next()
309                .expect("ensemble must have at least one component");
310            let fragment = &fragments.inner.fragments[&first_component];
311            let distribution_type: DistributionType = fragment.distribution_type.into();
312            let vnode_count = fragment.vnode_count();
313
314            // Assert all component fragments in this ensemble share the same vnode count.
315            for fragment_id in ensemble.component_fragments() {
316                let f = &fragments.inner.fragments[&fragment_id];
317                assert_eq!(
318                    vnode_count,
319                    f.vnode_count(),
320                    "component fragments {} and {} in the same no-shuffle ensemble have \
321                     different vnode counts: {} vs {}",
322                    first_component,
323                    fragment_id,
324                    vnode_count,
325                    f.vnode_count(),
326                );
327            }
328
329            EnsembleActorTemplate::render_new(
330                streaming_job_model,
331                worker_map,
332                None,
333                database_resource_group.to_owned(),
334                distribution_type,
335                vnode_count,
336            )?
337        };
338
339        // Render each new component fragment in this ensemble.
340        for fragment_id in ensemble.component_fragments() {
341            if !fragments.inner.fragments.contains_key(&fragment_id) {
342                continue; // Skip existing fragments.
343            }
344            let fragment = &fragments.inner.fragments[&fragment_id];
345            let distribution_type: DistributionType = fragment.distribution_type.into();
346            let aligner =
347                ComponentFragmentAligner::new_persistent(&actor_template, actor_id_counter);
348            let assignments = aligner.align_component_actor(distribution_type);
349            actor_assignments.insert(fragment_id, assignments);
350        }
351    }
352
353    // Step 3: Expand simple assignments into full StreamActor structures.
354    let mut result_stream_actors: HashMap<FragmentId, Vec<StreamActor>> = HashMap::new();
355    let mut result_actor_location: HashMap<ActorId, WorkerId> = HashMap::new();
356
357    for (fragment_id, assignments) in &actor_assignments {
358        let mut actors = Vec::with_capacity(assignments.len());
359        for (&actor_id, (worker_id, vnode_bitmap)) in assignments {
360            result_actor_location.insert(actor_id, *worker_id);
361            actors.push(StreamActor {
362                actor_id,
363                fragment_id: *fragment_id,
364                vnode_bitmap: vnode_bitmap.clone(),
365                mview_definition: definition.to_owned(),
366                expr_context: Some(ctx.to_expr_context()),
367                config_override: ctx.config_override.clone(),
368            });
369        }
370        result_stream_actors.insert(*fragment_id, actors);
371    }
372
373    Ok(RenderResult {
374        stream_actors: result_stream_actors,
375        actor_location: result_actor_location,
376    })
377}
378impl DatabaseCheckpointControl {
379    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
380    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
381        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
382        let node_actors =
383            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
384        (table_ids_to_commit, node_actors)
385    }
386
387    /// Helper for the simplest command variants: those that only need a
388    /// pre-computed mutation and a command name, with no actors to create
389    /// and no additional side effects on `self`.
390    fn apply_simple_command(
391        &self,
392        mutation: Option<Mutation>,
393        command_name: &'static str,
394    ) -> ApplyCommandResult {
395        let (table_ids, node_actors) = self.collect_base_info();
396        (
397            mutation,
398            table_ids,
399            None,
400            node_actors,
401            PostCollectCommand::Command(command_name.to_owned()),
402        )
403    }
404
405    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
406    /// will be removed from the state after the info get resolved.
407    pub(super) fn apply_command(
408        &mut self,
409        command: Option<Command>,
410        notifiers: &mut Vec<Notifier>,
411        barrier_info: BarrierInfo,
412        partial_graph_manager: &mut PartialGraphManager,
413        hummock_version_stats: &HummockVersionStats,
414        worker_nodes: &HashMap<WorkerId, WorkerNode>,
415    ) -> MetaResult<ApplyCommandInfo> {
416        debug_assert!(
417            !matches!(
418                command,
419                Some(Command::RescheduleIntent {
420                    reschedule_plan: None,
421                    ..
422                })
423            ),
424            "reschedule intent must be resolved before apply"
425        );
426        if matches!(
427            command,
428            Some(Command::RescheduleIntent {
429                reschedule_plan: None,
430                ..
431            })
432        ) {
433            bail!("reschedule intent must be resolved before apply");
434        }
435
436        /// Resolve source splits for a create streaming job command.
437        ///
438        /// Combines source fragment split resolution and backfill split alignment
439        /// into one step, looking up existing upstream actor splits from the inflight database info.
440        fn resolve_source_splits(
441            info: &CreateStreamingJobCommandInfo,
442            render_result: &RenderResult,
443            actor_no_shuffle: &ActorNewNoShuffle,
444            database_info: &InflightDatabaseInfo,
445        ) -> MetaResult<SplitAssignment> {
446            let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
447                .stream_actors
448                .iter()
449                .map(|(fragment_id, actors)| {
450                    (
451                        *fragment_id,
452                        actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
453                    )
454                })
455                .collect();
456            let mut resolved = SourceManager::resolve_fragment_to_actor_splits(
457                &info.stream_job_fragments,
458                &info.init_split_assignment,
459                &fragment_actor_ids,
460            )?;
461            resolved.extend(SourceManager::resolve_backfill_splits(
462                &info.stream_job_fragments,
463                actor_no_shuffle,
464                |fragment_id, actor_id| {
465                    database_info
466                        .fragment(fragment_id)
467                        .actors
468                        .get(&actor_id)
469                        .map(|info| info.splits.clone())
470                },
471            )?);
472            Ok(resolved)
473        }
474
475        // Throttle data for creating jobs (set only in the Throttle arm)
476        let mut throttle_for_creating_jobs: Option<(
477            HashSet<JobId>,
478            HashMap<FragmentId, ThrottleConfig>,
479        )> = None;
480
481        // Each variant handles its own pre-apply, edge building, mutation generation,
482        // collect base info, and post-apply. The match produces values consumed by the
483        // common snapshot-backfill-merging code that follows.
484        let (
485            mutation,
486            mut table_ids_to_commit,
487            mut actors_to_create,
488            mut node_actors,
489            post_collect_command,
490        ) = match command {
491            None => self.apply_simple_command(None, "barrier"),
492            Some(Command::CreateStreamingJob {
493                mut info,
494                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
495                cross_db_snapshot_backfill_info,
496            }) => {
497                let ensembles = resolve_no_shuffle_ensembles(
498                    &info.stream_job_fragments,
499                    &info.upstream_fragment_downstreams,
500                )?;
501                let actors = render_actors(
502                    &info.stream_job_fragments,
503                    &self.database_info,
504                    &info.definition,
505                    &info.stream_job_fragments.inner.ctx,
506                    &info.streaming_job_model,
507                    partial_graph_manager
508                        .control_stream_manager()
509                        .env
510                        .actor_id_generator(),
511                    worker_nodes,
512                    &ensembles,
513                    &info.database_resource_group,
514                )?;
515                {
516                    assert!(!self.state.is_paused());
517                    let snapshot_epoch = barrier_info.prev_epoch();
518                    // set snapshot epoch of upstream table for snapshot backfill
519                    for snapshot_backfill_epoch in snapshot_backfill_info
520                        .upstream_mv_table_id_to_backfill_epoch
521                        .values_mut()
522                    {
523                        assert_eq!(
524                            snapshot_backfill_epoch.replace(snapshot_epoch),
525                            None,
526                            "must not set previously"
527                        );
528                    }
529                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
530                        fill_snapshot_backfill_epoch(
531                            &mut fragment.nodes,
532                            Some(&snapshot_backfill_info),
533                            &cross_db_snapshot_backfill_info,
534                        )?;
535                    }
536                    let job_id = info.stream_job_fragments.stream_job_id();
537                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
538                        .upstream_mv_table_id_to_backfill_epoch
539                        .keys()
540                        .cloned()
541                        .collect();
542
543                    // Build edges first (needed for no-shuffle mapping used in split resolution)
544                    let mut edges = self.database_info.build_edge(
545                        Some((&info, true)),
546                        None,
547                        None,
548                        partial_graph_manager.control_stream_manager(),
549                        &actors.stream_actors,
550                        &actors.actor_location,
551                    );
552                    // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
553                    let resolved_split_assignment = resolve_source_splits(
554                        &info,
555                        &actors,
556                        edges.actor_new_no_shuffle(),
557                        &self.database_info,
558                    )?;
559
560                    let Entry::Vacant(entry) =
561                        self.independent_checkpoint_job_controls.entry(job_id)
562                    else {
563                        panic!("duplicated creating snapshot backfill job {job_id}");
564                    };
565
566                    let job = CreatingStreamingJobControl::new(
567                        entry,
568                        CreateSnapshotBackfillJobCommandInfo {
569                            info: info.clone(),
570                            snapshot_backfill_info: snapshot_backfill_info.clone(),
571                            cross_db_snapshot_backfill_info,
572                            resolved_split_assignment: resolved_split_assignment.clone(),
573                            refresh_interval_sec: None,
574                        },
575                        take(notifiers),
576                        snapshot_backfill_upstream_tables,
577                        snapshot_epoch,
578                        hummock_version_stats,
579                        partial_graph_manager,
580                        &mut edges,
581                        &resolved_split_assignment,
582                        &actors,
583                    )?;
584
585                    if let Some(fragment_infos) = job.fragment_infos() {
586                        self.database_info.shared_actor_infos.upsert(
587                            self.database_id,
588                            fragment_infos.values().map(|f| (f, job_id)),
589                        );
590                    }
591
592                    for upstream_mv_table_id in snapshot_backfill_info
593                        .upstream_mv_table_id_to_backfill_epoch
594                        .keys()
595                    {
596                        self.database_info.register_subscriber(
597                            upstream_mv_table_id.as_job_id(),
598                            info.streaming_job.id().as_subscriber_id(),
599                            SubscriberType::SnapshotBackfill,
600                        );
601                    }
602
603                    let mutation = Command::create_streaming_job_to_mutation(
604                        &info,
605                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
606                        self.state.is_paused(),
607                        &mut edges,
608                        partial_graph_manager.control_stream_manager(),
609                        None,
610                        &resolved_split_assignment,
611                        &actors.stream_actors,
612                        &actors.actor_location,
613                    )?;
614
615                    let (table_ids, node_actors) = self.collect_base_info();
616                    (
617                        Some(mutation),
618                        table_ids,
619                        None,
620                        node_actors,
621                        PostCollectCommand::barrier(),
622                    )
623                }
624            }
625            Some(Command::CreateStreamingJob {
626                mut info,
627                job_type: CreateStreamingJobType::BatchRefresh(mut batch_refresh_info),
628                cross_db_snapshot_backfill_info,
629            }) => {
630                {
631                    if self.state.is_paused() {
632                        bail!("cannot create batch refresh job while database barrier is paused");
633                    }
634                    let snapshot_epoch = barrier_info.prev_epoch();
635                    let job_id = info.stream_job_fragments.stream_job_id();
636                    let database_id = info.streaming_job.database_id();
637
638                    // 1. Fill snapshot backfill epochs.
639                    let snapshot_backfill_info = &mut batch_refresh_info.snapshot_backfill_info;
640                    for snapshot_backfill_epoch in snapshot_backfill_info
641                        .upstream_mv_table_id_to_backfill_epoch
642                        .values_mut()
643                    {
644                        assert_eq!(
645                            snapshot_backfill_epoch.replace(snapshot_epoch),
646                            None,
647                            "must not set previously"
648                        );
649                    }
650                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
651                        fill_snapshot_backfill_epoch(
652                            &mut fragment.nodes,
653                            Some(snapshot_backfill_info),
654                            &cross_db_snapshot_backfill_info,
655                        )?;
656                    }
657                    let snapshot_backfill_upstream_tables: HashSet<TableId> =
658                        snapshot_backfill_info
659                            .upstream_mv_table_id_to_backfill_epoch
660                            .keys()
661                            .cloned()
662                            .collect();
663
664                    // 2. Build BatchRefreshLogicalFragments (after epoch filling).
665                    let logical = BatchRefreshLogicalFragments {
666                        fragments: info
667                            .stream_job_fragments
668                            .inner
669                            .fragments
670                            .iter()
671                            .map(|(&fid, fragment)| {
672                                (
673                                    fid,
674                                    LoadedFragment {
675                                        fragment_id: fid,
676                                        job_id,
677                                        fragment_type_mask: fragment.fragment_type_mask,
678                                        distribution_type: fragment.distribution_type.into(),
679                                        vnode_count: fragment.vnode_count(),
680                                        nodes: fragment.nodes.clone(),
681                                        state_table_ids: fragment
682                                            .state_table_ids
683                                            .iter()
684                                            .cloned()
685                                            .collect(),
686                                        parallelism: None,
687                                    },
688                                )
689                            })
690                            .collect(),
691                        downstreams: info.stream_job_fragments.downstreams.clone(),
692                    };
693
694                    // 3. Create BatchRefreshJobCheckpointControl. `new()` handles actor
695                    //    rendering, the partial-graph initial barrier, and produces the
696                    //    database-graph mutation for the main barrier.
697                    assert!(
698                        !self
699                            .independent_checkpoint_job_controls
700                            .contains_key(&job_id),
701                        "duplicated creating batch refresh job {job_id}"
702                    );
703
704                    let snapshot_backfill_info_clone =
705                        batch_refresh_info.snapshot_backfill_info.clone();
706                    let refresh_interval_sec = batch_refresh_info.refresh_interval_sec;
707
708                    // Database-graph `Add` mutation: batch refresh has no actors in the
709                    // database graph; it only needs to register snapshot-backfill
710                    // subscribers on the upstream MV tables.
711                    let subscriber_id =
712                        info.stream_job_fragments.stream_job_id().as_subscriber_id();
713                    let mutation = Mutation::Add(AddMutation {
714                        actor_dispatchers: Default::default(),
715                        added_actors: Default::default(),
716                        actor_splits: Default::default(),
717                        pause: false,
718                        subscriptions_to_add: snapshot_backfill_info_clone
719                            .upstream_mv_table_id_to_backfill_epoch
720                            .keys()
721                            .map(|table_id| PbSubscriptionUpstreamInfo {
722                                subscriber_id,
723                                upstream_mv_table_id: *table_id,
724                            })
725                            .collect(),
726                        backfill_nodes_to_pause: Default::default(),
727                        actor_cdc_table_snapshot_splits: None,
728                        new_upstream_sinks: Default::default(),
729                    });
730
731                    let job = BatchRefreshJobCheckpointControl::new(
732                        database_id,
733                        job_id,
734                        CreateSnapshotBackfillJobCommandInfo {
735                            info: info.clone(),
736                            snapshot_backfill_info: snapshot_backfill_info_clone.clone(),
737                            cross_db_snapshot_backfill_info,
738                            resolved_split_assignment: Default::default(),
739                            refresh_interval_sec: Some(refresh_interval_sec),
740                        },
741                        take(notifiers),
742                        snapshot_backfill_upstream_tables,
743                        snapshot_epoch,
744                        hummock_version_stats,
745                        partial_graph_manager,
746                        &logical,
747                        worker_nodes,
748                        refresh_interval_sec,
749                    )?;
750
751                    if let Some(fragment_infos) = job.fragment_infos() {
752                        self.database_info.shared_actor_infos.upsert(
753                            self.database_id,
754                            fragment_infos.values().map(|f| (f, job_id)),
755                        );
756                    }
757
758                    self.independent_checkpoint_job_controls
759                        .insert(job_id, IndependentCheckpointJobControl::BatchRefresh(job));
760
761                    // Register permanent subscriber (never unregistered until MV is dropped)
762                    for upstream_mv_table_id in snapshot_backfill_info_clone
763                        .upstream_mv_table_id_to_backfill_epoch
764                        .keys()
765                    {
766                        self.database_info.register_subscriber(
767                            upstream_mv_table_id.as_job_id(),
768                            info.streaming_job.id().as_subscriber_id(),
769                            SubscriberType::SnapshotBackfill,
770                        );
771                    }
772
773                    let (table_ids, node_actors) = self.collect_base_info();
774                    (
775                        Some(mutation),
776                        table_ids,
777                        None,
778                        node_actors,
779                        PostCollectCommand::barrier(),
780                    )
781                }
782            }
783            Some(Command::CreateStreamingJob {
784                mut info,
785                job_type,
786                cross_db_snapshot_backfill_info,
787            }) => {
788                let ensembles = resolve_no_shuffle_ensembles(
789                    &info.stream_job_fragments,
790                    &info.upstream_fragment_downstreams,
791                )?;
792                let actors = render_actors(
793                    &info.stream_job_fragments,
794                    &self.database_info,
795                    &info.definition,
796                    &info.stream_job_fragments.inner.ctx,
797                    &info.streaming_job_model,
798                    partial_graph_manager
799                        .control_stream_manager()
800                        .env
801                        .actor_id_generator(),
802                    worker_nodes,
803                    &ensembles,
804                    &info.database_resource_group,
805                )?;
806                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
807                    fill_snapshot_backfill_epoch(
808                        &mut fragment.nodes,
809                        None,
810                        &cross_db_snapshot_backfill_info,
811                    )?;
812                }
813
814                // Build edges
815                let new_upstream_sink =
816                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
817                        Some(ctx)
818                    } else {
819                        None
820                    };
821
822                let mut edges = self.database_info.build_edge(
823                    Some((&info, false)),
824                    None,
825                    new_upstream_sink,
826                    partial_graph_manager.control_stream_manager(),
827                    &actors.stream_actors,
828                    &actors.actor_location,
829                );
830                // Phase 2: Resolve source-level DiscoveredSplits to actor-level SplitAssignment
831                let resolved_split_assignment = resolve_source_splits(
832                    &info,
833                    &actors,
834                    edges.actor_new_no_shuffle(),
835                    &self.database_info,
836                )?;
837
838                // Pre-apply: add new job and fragments
839                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
840                    let (fragment, _) =
841                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
842                            .expect("should have parallel cdc fragment");
843                    Some(CdcTableBackfillTracker::new(
844                        fragment.fragment_id,
845                        splits.clone(),
846                    ))
847                } else {
848                    None
849                };
850                self.database_info
851                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
852                self.database_info.pre_apply_new_fragments(
853                    info.stream_job_fragments
854                        .new_fragment_info(
855                            &actors.stream_actors,
856                            &actors.actor_location,
857                            &resolved_split_assignment,
858                        )
859                        .map(|(fragment_id, fragment_infos)| {
860                            (fragment_id, info.streaming_job.id(), fragment_infos)
861                        }),
862                );
863                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
864                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
865                    self.database_info.pre_apply_add_node_upstream(
866                        downstream_fragment_id,
867                        &PbUpstreamSinkInfo {
868                            upstream_fragment_id: ctx.sink_fragment_id,
869                            sink_output_schema: ctx.sink_output_fields.clone(),
870                            project_exprs: ctx.project_exprs.clone(),
871                        },
872                    );
873                }
874
875                let (table_ids, node_actors) = self.collect_base_info();
876
877                // Actors to create
878                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
879                    &info,
880                    &mut edges,
881                    &actors.stream_actors,
882                    &actors.actor_location,
883                ));
884
885                // CDC table snapshot splits
886                let actor_cdc_table_snapshot_splits = self
887                    .database_info
888                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
889
890                // Mutation
891                let is_currently_paused = self.state.is_paused();
892                let mutation = Command::create_streaming_job_to_mutation(
893                    &info,
894                    &job_type,
895                    is_currently_paused,
896                    &mut edges,
897                    partial_graph_manager.control_stream_manager(),
898                    actor_cdc_table_snapshot_splits,
899                    &resolved_split_assignment,
900                    &actors.stream_actors,
901                    &actors.actor_location,
902                )?;
903
904                (
905                    Some(mutation),
906                    table_ids,
907                    actors_to_create,
908                    node_actors,
909                    PostCollectCommand::CreateStreamingJob {
910                        info,
911                        job_type,
912                        cross_db_snapshot_backfill_info,
913                        resolved_split_assignment,
914                    },
915                )
916            }
917
918            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
919
920            Some(Command::Pause) => {
921                let prev_is_paused = self.state.is_paused();
922                self.state.set_is_paused(true);
923                let mutation = Command::pause_to_mutation(prev_is_paused);
924                let (table_ids, node_actors) = self.collect_base_info();
925                (
926                    mutation,
927                    table_ids,
928                    None,
929                    node_actors,
930                    PostCollectCommand::Command("Pause".to_owned()),
931                )
932            }
933
934            Some(Command::Resume) => {
935                let prev_is_paused = self.state.is_paused();
936                self.state.set_is_paused(false);
937                let mutation = Command::resume_to_mutation(prev_is_paused);
938                let (table_ids, node_actors) = self.collect_base_info();
939                (
940                    mutation,
941                    table_ids,
942                    None,
943                    node_actors,
944                    PostCollectCommand::Command("Resume".to_owned()),
945                )
946            }
947
948            Some(Command::Throttle { jobs, config }) => {
949                let mutation = Some(Command::throttle_to_mutation(&config));
950                for (fragment_id, throttle_config) in &config {
951                    self.database_info
952                        .pre_apply_throttle(*fragment_id, throttle_config);
953                }
954                throttle_for_creating_jobs = Some((jobs, config));
955                self.apply_simple_command(mutation, "Throttle")
956            }
957
958            Some(Command::DropStreamingJobs {
959                streaming_job_ids,
960                unregistered_state_table_ids: _,
961                dropped_sink_fragment_by_targets,
962            }) => {
963                // pre_apply: drop node upstream for sink targets
964                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
965                    self.database_info
966                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
967                }
968
969                let (table_ids, node_actors) = self.collect_base_info();
970
971                let mut actors = Vec::new();
972                for job_id in streaming_job_ids {
973                    let Some(job) = self.database_info.post_apply_remove_job(job_id) else {
974                        warn!(
975                            %job_id,
976                            "skip drop payload for streaming job that has already been removed from barrier worker"
977                        );
978                        continue;
979                    };
980
981                    for fragment in job.fragment_infos.values() {
982                        actors.extend(fragment.actors.keys().copied());
983                    }
984                }
985
986                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
987                    &actors,
988                    &dropped_sink_fragment_by_targets,
989                ));
990                (
991                    mutation,
992                    table_ids,
993                    None,
994                    node_actors,
995                    PostCollectCommand::DropStreamingJobs,
996                )
997            }
998
999            Some(Command::RescheduleIntent {
1000                reschedule_plan, ..
1001            }) => {
1002                let ReschedulePlan {
1003                    reschedules,
1004                    fragment_actors,
1005                } = reschedule_plan
1006                    .as_ref()
1007                    .expect("reschedule intent should be resolved in global barrier worker");
1008
1009                // Pre-apply: reschedule fragments
1010                for (fragment_id, reschedule) in reschedules {
1011                    self.database_info.pre_apply_reschedule(
1012                        *fragment_id,
1013                        reschedule
1014                            .added_actors
1015                            .iter()
1016                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
1017                                actors.iter().map(|actor_id| {
1018                                    (
1019                                        *actor_id,
1020                                        InflightActorInfo {
1021                                            worker_id: *node_id,
1022                                            vnode_bitmap: reschedule
1023                                                .newly_created_actors
1024                                                .get(actor_id)
1025                                                .expect("should exist")
1026                                                .0
1027                                                .0
1028                                                .vnode_bitmap
1029                                                .clone(),
1030                                            splits: reschedule
1031                                                .actor_splits
1032                                                .get(actor_id)
1033                                                .cloned()
1034                                                .unwrap_or_default(),
1035                                        },
1036                                    )
1037                                })
1038                            })
1039                            .collect(),
1040                        reschedule
1041                            .vnode_bitmap_updates
1042                            .iter()
1043                            .filter(|(actor_id, _)| {
1044                                !reschedule.newly_created_actors.contains_key(*actor_id)
1045                            })
1046                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
1047                            .collect(),
1048                        reschedule.actor_splits.clone(),
1049                    );
1050                }
1051
1052                let (table_ids, node_actors) = self.collect_base_info();
1053
1054                // Actors to create
1055                let actors_to_create = Some(Command::reschedule_actors_to_create(
1056                    reschedules,
1057                    fragment_actors,
1058                    &self.database_info,
1059                    partial_graph_manager.control_stream_manager(),
1060                ));
1061
1062                // Post-apply: remove old actors
1063                self.database_info
1064                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
1065                        (
1066                            *fragment_id,
1067                            reschedule.removed_actors.iter().cloned().collect(),
1068                        )
1069                    }));
1070
1071                // Mutation
1072                let mutation = Command::reschedule_to_mutation(
1073                    reschedules,
1074                    fragment_actors,
1075                    partial_graph_manager.control_stream_manager(),
1076                    &mut self.database_info,
1077                )?;
1078
1079                let reschedules = reschedule_plan
1080                    .expect("reschedule intent should be resolved in global barrier worker")
1081                    .reschedules;
1082                (
1083                    mutation,
1084                    table_ids,
1085                    actors_to_create,
1086                    node_actors,
1087                    PostCollectCommand::Reschedule { reschedules },
1088                )
1089            }
1090
1091            Some(Command::ReplaceStreamJob(plan)) => {
1092                let ensembles = resolve_no_shuffle_ensembles(
1093                    &plan.new_fragments,
1094                    &plan.upstream_fragment_downstreams,
1095                )?;
1096                let mut render_result = render_actors(
1097                    &plan.new_fragments,
1098                    &self.database_info,
1099                    "", // replace jobs don't need mview definition
1100                    &plan.new_fragments.inner.ctx,
1101                    &plan.streaming_job_model,
1102                    partial_graph_manager
1103                        .control_stream_manager()
1104                        .env
1105                        .actor_id_generator(),
1106                    worker_nodes,
1107                    &ensembles,
1108                    &plan.database_resource_group,
1109                )?;
1110
1111                // Render actors for auto_refresh_schema_sinks.
1112                // Each sink's new_fragment inherits parallelism from its original_fragment.
1113                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1114                    let actor_id_counter = partial_graph_manager
1115                        .control_stream_manager()
1116                        .env
1117                        .actor_id_generator();
1118                    for sink_ctx in sinks {
1119                        let original_fragment_id = sink_ctx.original_fragment.fragment_id;
1120                        let original_frag_info = self.database_info.fragment(original_fragment_id);
1121                        let actor_template = EnsembleActorTemplate::from_existing_inflight_fragment(
1122                            original_frag_info,
1123                        );
1124                        let new_aligner = ComponentFragmentAligner::new_persistent(
1125                            &actor_template,
1126                            actor_id_counter,
1127                        );
1128                        let distribution_type: DistributionType =
1129                            sink_ctx.new_fragment.distribution_type.into();
1130                        let actor_assignments =
1131                            new_aligner.align_component_actor(distribution_type);
1132                        let new_fragment_id = sink_ctx.new_fragment.fragment_id;
1133                        let mut actors = Vec::with_capacity(actor_assignments.len());
1134                        for (&actor_id, (worker_id, vnode_bitmap)) in &actor_assignments {
1135                            render_result.actor_location.insert(actor_id, *worker_id);
1136                            actors.push(StreamActor {
1137                                actor_id,
1138                                fragment_id: new_fragment_id,
1139                                vnode_bitmap: vnode_bitmap.clone(),
1140                                mview_definition: String::new(),
1141                                expr_context: Some(sink_ctx.ctx.to_expr_context()),
1142                                config_override: sink_ctx.ctx.config_override.clone(),
1143                            });
1144                        }
1145                        render_result.stream_actors.insert(new_fragment_id, actors);
1146                    }
1147                }
1148
1149                // Build edges first (needed for no-shuffle mapping used in split resolution)
1150                let mut edges = self.database_info.build_edge(
1151                    None,
1152                    Some(&plan),
1153                    None,
1154                    partial_graph_manager.control_stream_manager(),
1155                    &render_result.stream_actors,
1156                    &render_result.actor_location,
1157                );
1158
1159                // Phase 2: Resolve splits to actor-level assignment.
1160                let fragment_actor_ids: HashMap<FragmentId, Vec<ActorId>> = render_result
1161                    .stream_actors
1162                    .iter()
1163                    .map(|(fragment_id, actors)| {
1164                        (
1165                            *fragment_id,
1166                            actors.iter().map(|a| a.actor_id).collect::<Vec<_>>(),
1167                        )
1168                    })
1169                    .collect();
1170                let resolved_split_assignment = match &plan.split_plan {
1171                    ReplaceJobSplitPlan::Discovered(discovered) => {
1172                        SourceManager::resolve_fragment_to_actor_splits(
1173                            &plan.new_fragments,
1174                            discovered,
1175                            &fragment_actor_ids,
1176                        )?
1177                    }
1178                    ReplaceJobSplitPlan::AlignFromPrevious => {
1179                        SourceManager::resolve_replace_source_splits(
1180                            &plan.new_fragments,
1181                            &plan.replace_upstream,
1182                            edges.actor_new_no_shuffle(),
1183                            |_fragment_id, actor_id| {
1184                                self.database_info.fragment_infos().find_map(|fragment| {
1185                                    fragment
1186                                        .actors
1187                                        .get(&actor_id)
1188                                        .map(|info| info.splits.clone())
1189                                })
1190                            },
1191                        )?
1192                    }
1193                };
1194
1195                // Pre-apply: add new fragments and replace upstream
1196                self.database_info.pre_apply_new_fragments(
1197                    plan.new_fragments
1198                        .new_fragment_info(
1199                            &render_result.stream_actors,
1200                            &render_result.actor_location,
1201                            &resolved_split_assignment,
1202                        )
1203                        .map(|(fragment_id, new_fragment)| {
1204                            (fragment_id, plan.streaming_job.id(), new_fragment)
1205                        }),
1206                );
1207                for (fragment_id, replace_map) in &plan.replace_upstream {
1208                    self.database_info
1209                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
1210                }
1211                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1212                    self.database_info
1213                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
1214                            (
1215                                sink.new_fragment.fragment_id,
1216                                sink.original_sink.id.as_job_id(),
1217                                sink.new_fragment_info(
1218                                    &render_result.stream_actors,
1219                                    &render_result.actor_location,
1220                                ),
1221                            )
1222                        }));
1223                }
1224
1225                let (table_ids, node_actors) = self.collect_base_info();
1226
1227                // Actors to create
1228                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
1229                    &plan,
1230                    &mut edges,
1231                    &self.database_info,
1232                    &render_result.stream_actors,
1233                    &render_result.actor_location,
1234                ));
1235
1236                // Mutation (must be generated before removing old fragments,
1237                // because it reads actor info from database_info)
1238                let mutation = Command::replace_stream_job_to_mutation(
1239                    &plan,
1240                    &mut edges,
1241                    &mut self.database_info,
1242                    &resolved_split_assignment,
1243                )?;
1244
1245                // Post-apply: remove old fragments
1246                {
1247                    let mut fragment_ids_to_remove: Vec<_> = plan
1248                        .old_fragments
1249                        .fragments
1250                        .values()
1251                        .map(|f| f.fragment_id)
1252                        .collect();
1253                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
1254                        fragment_ids_to_remove
1255                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
1256                    }
1257                    self.database_info
1258                        .post_apply_remove_fragments(fragment_ids_to_remove);
1259                }
1260
1261                (
1262                    mutation,
1263                    table_ids,
1264                    actors_to_create,
1265                    node_actors,
1266                    PostCollectCommand::ReplaceStreamJob {
1267                        plan,
1268                        resolved_split_assignment,
1269                    },
1270                )
1271            }
1272
1273            Some(Command::SourceChangeSplit(split_state)) => {
1274                // Pre-apply: split assignments
1275                self.database_info.pre_apply_split_assignments(
1276                    split_state
1277                        .split_assignment
1278                        .iter()
1279                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
1280                );
1281
1282                let mutation = Some(Command::source_change_split_to_mutation(
1283                    &split_state.split_assignment,
1284                ));
1285                let (table_ids, node_actors) = self.collect_base_info();
1286                (
1287                    mutation,
1288                    table_ids,
1289                    None,
1290                    node_actors,
1291                    PostCollectCommand::SourceChangeSplit {
1292                        split_assignment: split_state.split_assignment,
1293                    },
1294                )
1295            }
1296
1297            Some(Command::CreateSubscription {
1298                subscription_id,
1299                upstream_mv_table_id,
1300                retention_second,
1301            }) => {
1302                self.database_info.register_subscriber(
1303                    upstream_mv_table_id.as_job_id(),
1304                    subscription_id.as_subscriber_id(),
1305                    SubscriberType::Subscription(retention_second),
1306                );
1307                let mutation = Some(Command::create_subscription_to_mutation(
1308                    upstream_mv_table_id,
1309                    subscription_id,
1310                ));
1311                let (table_ids, node_actors) = self.collect_base_info();
1312                (
1313                    mutation,
1314                    table_ids,
1315                    None,
1316                    node_actors,
1317                    PostCollectCommand::CreateSubscription { subscription_id },
1318                )
1319            }
1320
1321            Some(Command::DropSubscription {
1322                subscription_id,
1323                upstream_mv_table_id,
1324            }) => {
1325                if self
1326                    .database_info
1327                    .unregister_subscriber(
1328                        upstream_mv_table_id.as_job_id(),
1329                        subscription_id.as_subscriber_id(),
1330                    )
1331                    .is_none()
1332                {
1333                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
1334                }
1335                let mutation = Some(Command::drop_subscription_to_mutation(
1336                    upstream_mv_table_id,
1337                    subscription_id,
1338                ));
1339                let (table_ids, node_actors) = self.collect_base_info();
1340                (
1341                    mutation,
1342                    table_ids,
1343                    None,
1344                    node_actors,
1345                    PostCollectCommand::Command("DropSubscription".to_owned()),
1346                )
1347            }
1348
1349            Some(Command::AlterSubscriptionRetention {
1350                subscription_id,
1351                upstream_mv_table_id,
1352                retention_second,
1353            }) => {
1354                self.database_info.update_subscription_retention(
1355                    upstream_mv_table_id.as_job_id(),
1356                    subscription_id.as_subscriber_id(),
1357                    retention_second,
1358                );
1359                self.apply_simple_command(None, "AlterSubscriptionRetention")
1360            }
1361
1362            Some(Command::ConnectorPropsChange(config)) => {
1363                let mutation = Some(Command::connector_props_change_to_mutation(&config));
1364                let (table_ids, node_actors) = self.collect_base_info();
1365                (
1366                    mutation,
1367                    table_ids,
1368                    None,
1369                    node_actors,
1370                    PostCollectCommand::ConnectorPropsChange(config),
1371                )
1372            }
1373
1374            Some(Command::Refresh {
1375                table_id,
1376                associated_source_id,
1377            }) => {
1378                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
1379                self.apply_simple_command(mutation, "Refresh")
1380            }
1381
1382            Some(Command::ListFinish {
1383                table_id: _,
1384                associated_source_id,
1385            }) => {
1386                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
1387                self.apply_simple_command(mutation, "ListFinish")
1388            }
1389
1390            Some(Command::LoadFinish {
1391                table_id: _,
1392                associated_source_id,
1393            }) => {
1394                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
1395                self.apply_simple_command(mutation, "LoadFinish")
1396            }
1397
1398            Some(Command::ResetSource { source_id }) => {
1399                let mutation = Some(Command::reset_source_to_mutation(source_id));
1400                self.apply_simple_command(mutation, "ResetSource")
1401            }
1402
1403            Some(Command::ResumeBackfill { target }) => {
1404                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
1405                let (table_ids, node_actors) = self.collect_base_info();
1406                (
1407                    mutation,
1408                    table_ids,
1409                    None,
1410                    node_actors,
1411                    PostCollectCommand::ResumeBackfill { target },
1412                )
1413            }
1414
1415            Some(Command::InjectSourceOffsets {
1416                source_id,
1417                split_offsets,
1418            }) => {
1419                let mutation = Some(Command::inject_source_offsets_to_mutation(
1420                    source_id,
1421                    &split_offsets,
1422                ));
1423                self.apply_simple_command(mutation, "InjectSourceOffsets")
1424            }
1425        };
1426
1427        let mut finished_snapshot_backfill_jobs = HashSet::new();
1428        let mutation = match mutation {
1429            Some(mutation) => Some(mutation),
1430            None => {
1431                let mut finished_snapshot_backfill_job_info = HashMap::new();
1432                if barrier_info.kind.is_checkpoint() {
1433                    for (&job_id, job) in &mut self.independent_checkpoint_job_controls {
1434                        if let IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) =
1435                            job
1436                            && creating_job.should_merge_to_upstream()
1437                        {
1438                            let info = creating_job
1439                                .start_consume_upstream(partial_graph_manager, &barrier_info)?;
1440                            finished_snapshot_backfill_job_info
1441                                .try_insert(job_id, info)
1442                                .expect("non-duplicated");
1443                        }
1444                    }
1445                }
1446
1447                if !finished_snapshot_backfill_job_info.is_empty() {
1448                    let actors_to_create = actors_to_create.get_or_insert_default();
1449                    let mut subscriptions_to_drop = vec![];
1450                    let mut dispatcher_update = vec![];
1451                    let mut actor_splits = HashMap::new();
1452                    for (job_id, info) in finished_snapshot_backfill_job_info {
1453                        finished_snapshot_backfill_jobs.insert(job_id);
1454                        subscriptions_to_drop.extend(
1455                            info.snapshot_backfill_upstream_tables.iter().map(
1456                                |upstream_table_id| PbSubscriptionUpstreamInfo {
1457                                    subscriber_id: job_id.as_subscriber_id(),
1458                                    upstream_mv_table_id: *upstream_table_id,
1459                                },
1460                            ),
1461                        );
1462                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
1463                            assert_matches!(
1464                                self.database_info.unregister_subscriber(
1465                                    upstream_mv_table_id.as_job_id(),
1466                                    job_id.as_subscriber_id()
1467                                ),
1468                                Some(SubscriberType::SnapshotBackfill)
1469                            );
1470                        }
1471
1472                        table_ids_to_commit.extend(
1473                            info.fragment_infos
1474                                .values()
1475                                .flat_map(|fragment| fragment.state_table_ids.iter())
1476                                .copied(),
1477                        );
1478
1479                        let actor_len = info
1480                            .fragment_infos
1481                            .values()
1482                            .map(|fragment| fragment.actors.len() as u64)
1483                            .sum();
1484                        let id_gen = GlobalActorIdGen::new(
1485                            partial_graph_manager
1486                                .control_stream_manager()
1487                                .env
1488                                .actor_id_generator(),
1489                            actor_len,
1490                        );
1491                        let mut next_local_actor_id = 0;
1492                        // mapping from old_actor_id to new_actor_id
1493                        let actor_mapping: HashMap<_, _> = info
1494                            .fragment_infos
1495                            .values()
1496                            .flat_map(|fragment| fragment.actors.keys())
1497                            .map(|old_actor_id| {
1498                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
1499                                next_local_actor_id += 1;
1500                                (*old_actor_id, new_actor_id.as_global_id())
1501                            })
1502                            .collect();
1503                        let actor_mapping = &actor_mapping;
1504                        let new_stream_actors: HashMap<_, _> = info
1505                            .stream_actors
1506                            .into_iter()
1507                            .map(|(old_actor_id, mut actor)| {
1508                                let new_actor_id = actor_mapping[&old_actor_id];
1509                                actor.actor_id = new_actor_id;
1510                                (new_actor_id, actor)
1511                            })
1512                            .collect();
1513                        let new_fragment_info: HashMap<_, _> = info
1514                            .fragment_infos
1515                            .into_iter()
1516                            .map(|(fragment_id, mut fragment)| {
1517                                let actors = take(&mut fragment.actors);
1518                                fragment.actors = actors
1519                                    .into_iter()
1520                                    .map(|(old_actor_id, actor)| {
1521                                        let new_actor_id = actor_mapping[&old_actor_id];
1522                                        (new_actor_id, actor)
1523                                    })
1524                                    .collect();
1525                                (fragment_id, fragment)
1526                            })
1527                            .collect();
1528                        actor_splits.extend(
1529                            new_fragment_info
1530                                .values()
1531                                .flat_map(|fragment| &fragment.actors)
1532                                .map(|(actor_id, actor)| {
1533                                    (
1534                                        *actor_id,
1535                                        ConnectorSplits {
1536                                            splits: actor
1537                                                .splits
1538                                                .iter()
1539                                                .map(ConnectorSplit::from)
1540                                                .collect(),
1541                                        },
1542                                    )
1543                                }),
1544                        );
1545                        // new actors belong to the database partial graph
1546                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
1547                        let mut edge_builder = FragmentEdgeBuilder::new(
1548                            info.upstream_fragment_downstreams
1549                                .keys()
1550                                .map(|upstream_fragment_id| {
1551                                    self.database_info.fragment(*upstream_fragment_id)
1552                                })
1553                                .chain(new_fragment_info.values())
1554                                .map(|fragment| {
1555                                    (
1556                                        fragment.fragment_id,
1557                                        EdgeBuilderFragmentInfo::from_inflight(
1558                                            fragment,
1559                                            partial_graph_id,
1560                                            partial_graph_manager.control_stream_manager(),
1561                                        ),
1562                                    )
1563                                }),
1564                        );
1565                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
1566                        edge_builder.add_relations(&info.downstreams);
1567                        let mut edges = edge_builder.build();
1568                        let new_actors_to_create = edges.collect_actors_to_create(
1569                            new_fragment_info.values().map(|fragment| {
1570                                (
1571                                    fragment.fragment_id,
1572                                    &fragment.nodes,
1573                                    fragment.actors.iter().map(|(actor_id, actor)| {
1574                                        (&new_stream_actors[actor_id], actor.worker_id)
1575                                    }),
1576                                    [], // no initial subscriber for backfilling job
1577                                )
1578                            }),
1579                        );
1580                        dispatcher_update.extend(
1581                            info.upstream_fragment_downstreams.keys().flat_map(
1582                                |upstream_fragment_id| {
1583                                    let new_actor_dispatchers = edges
1584                                        .dispatchers
1585                                        .remove(upstream_fragment_id)
1586                                        .expect("should exist");
1587                                    new_actor_dispatchers.into_iter().flat_map(
1588                                        |(upstream_actor_id, dispatchers)| {
1589                                            dispatchers.into_iter().map(move |dispatcher| {
1590                                                PbDispatcherUpdate {
1591                                                    actor_id: upstream_actor_id,
1592                                                    dispatcher_id: dispatcher.dispatcher_id,
1593                                                    hash_mapping: dispatcher.hash_mapping,
1594                                                    removed_downstream_actor_id: dispatcher
1595                                                        .downstream_actor_id
1596                                                        .iter()
1597                                                        .map(|new_downstream_actor_id| {
1598                                                            actor_mapping
1599                                                            .iter()
1600                                                            .find_map(
1601                                                                |(old_actor_id, new_actor_id)| {
1602                                                                    (new_downstream_actor_id
1603                                                                        == new_actor_id)
1604                                                                        .then_some(*old_actor_id)
1605                                                                },
1606                                                            )
1607                                                            .expect("should exist")
1608                                                        })
1609                                                        .collect(),
1610                                                    added_downstream_actor_id: dispatcher
1611                                                        .downstream_actor_id,
1612                                                }
1613                                            })
1614                                        },
1615                                    )
1616                                },
1617                            ),
1618                        );
1619                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
1620                        for (worker_id, worker_actors) in new_actors_to_create {
1621                            node_actors.entry(worker_id).or_default().extend(
1622                                worker_actors.values().flat_map(|(_, actors, _)| {
1623                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
1624                                }),
1625                            );
1626                            actors_to_create
1627                                .entry(worker_id)
1628                                .or_default()
1629                                .extend(worker_actors);
1630                        }
1631                        self.database_info.add_existing(InflightStreamingJobInfo {
1632                            job_id,
1633                            fragment_infos: new_fragment_info,
1634                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1635                            status: CreateStreamingJobStatus::Created,
1636                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1637                        });
1638                    }
1639
1640                    Some(PbMutation::Update(PbUpdateMutation {
1641                        dispatcher_update,
1642                        merge_update: vec![], // no upstream update on existing actors
1643                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1644                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1645                        actor_splits,
1646                        actor_new_dispatchers: Default::default(), // no new dispatcher
1647                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1648                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1649                        subscriptions_to_drop,
1650                    }))
1651                } else {
1652                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1653                    if fragment_ids.is_empty() {
1654                        None
1655                    } else {
1656                        Some(PbMutation::StartFragmentBackfill(
1657                            PbStartFragmentBackfillMutation { fragment_ids },
1658                        ))
1659                    }
1660                }
1661            }
1662        };
1663
1664        // Forward barrier to independent job controls
1665        for (job_id, job) in &mut self.independent_checkpoint_job_controls {
1666            match job {
1667                IndependentCheckpointJobControl::CreatingStreamingJob(creating_job) => {
1668                    if !finished_snapshot_backfill_jobs.contains(job_id) {
1669                        let throttle_mutation = if let Some((ref jobs, ref config)) =
1670                            throttle_for_creating_jobs
1671                            && jobs.contains(job_id)
1672                        {
1673                            assert_eq!(
1674                                jobs.len(),
1675                                1,
1676                                "should not alter rate limit of snapshot backfill job with other jobs"
1677                            );
1678                            Some((
1679                                Mutation::Throttle(ThrottleMutation {
1680                                    fragment_throttle: config
1681                                        .iter()
1682                                        .map(|(fragment_id, config)| (*fragment_id, *config))
1683                                        .collect(),
1684                                }),
1685                                take(notifiers),
1686                            ))
1687                        } else {
1688                            None
1689                        };
1690                        creating_job.on_new_upstream_barrier(
1691                            partial_graph_manager,
1692                            &barrier_info,
1693                            throttle_mutation,
1694                        )?;
1695                    }
1696                }
1697                IndependentCheckpointJobControl::BatchRefresh(batch_refresh_job) => {
1698                    batch_refresh_job.on_new_upstream_barrier(
1699                        partial_graph_manager,
1700                        &barrier_info,
1701                        None, // no throttle mutation for batch refresh jobs
1702                    )?;
1703                }
1704            }
1705        }
1706
1707        partial_graph_manager.inject_barrier(
1708            to_partial_graph_id(self.database_id, None),
1709            mutation,
1710            &node_actors,
1711            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1712            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1713            actors_to_create,
1714            PartialGraphBarrierInfo::new(
1715                post_collect_command,
1716                barrier_info,
1717                take(notifiers),
1718                table_ids_to_commit,
1719            ),
1720        )?;
1721
1722        Ok(ApplyCommandInfo {
1723            jobs_to_wait: finished_snapshot_backfill_jobs,
1724        })
1725    }
1726}