risingwave_meta/barrier/checkpoint/
state.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::assert_matches::assert_matches;
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18use std::mem::take;
19
20use risingwave_common::bail;
21use risingwave_common::catalog::TableId;
22use risingwave_common::id::JobId;
23use risingwave_common::util::epoch::Epoch;
24use risingwave_meta_model::WorkerId;
25use risingwave_pb::hummock::HummockVersionStats;
26use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
27use risingwave_pb::stream_plan::barrier_mutation::{Mutation, PbMutation};
28use risingwave_pb::stream_plan::throttle_mutation::ThrottleConfig;
29use risingwave_pb::stream_plan::update_mutation::PbDispatcherUpdate;
30use risingwave_pb::stream_plan::{
31    PbStartFragmentBackfillMutation, PbSubscriptionUpstreamInfo, PbUpdateMutation,
32    PbUpstreamSinkInfo, ThrottleMutation,
33};
34use tracing::warn;
35
36use crate::MetaResult;
37use crate::barrier::cdc_progress::CdcTableBackfillTracker;
38use crate::barrier::checkpoint::{CreatingStreamingJobControl, DatabaseCheckpointControl};
39use crate::barrier::command::{PostCollectCommand, ReschedulePlan};
40use crate::barrier::context::CreateSnapshotBackfillJobCommandInfo;
41use crate::barrier::edge_builder::FragmentEdgeBuilder;
42use crate::barrier::info::{
43    BarrierInfo, CreateStreamingJobStatus, InflightStreamingJobInfo, SubscriberType,
44};
45use crate::barrier::notifier::Notifier;
46use crate::barrier::partial_graph::PartialGraphManager;
47use crate::barrier::rpc::to_partial_graph_id;
48use crate::barrier::{BarrierKind, Command, CreateStreamingJobType, TracedEpoch};
49use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
50use crate::model::{ActorId, FragmentId, StreamJobActorsToCreate};
51use crate::stream::cdc::parallel_cdc_table_backfill_fragment;
52use crate::stream::{GlobalActorIdGen, fill_snapshot_backfill_epoch};
53
54/// The latest state of `GlobalBarrierWorker` after injecting the latest barrier.
55pub(in crate::barrier) struct BarrierWorkerState {
56    /// The last sent `prev_epoch`
57    ///
58    /// There's no need to persist this field. On recovery, we will restore this from the latest
59    /// committed snapshot in `HummockManager`.
60    in_flight_prev_epoch: TracedEpoch,
61
62    /// The `prev_epoch` of pending non checkpoint barriers
63    pending_non_checkpoint_barriers: Vec<u64>,
64
65    /// Whether the cluster is paused.
66    is_paused: bool,
67}
68
69impl BarrierWorkerState {
70    pub(super) fn new() -> Self {
71        Self {
72            in_flight_prev_epoch: TracedEpoch::new(Epoch::now()),
73            pending_non_checkpoint_barriers: vec![],
74            is_paused: false,
75        }
76    }
77
78    pub fn recovery(in_flight_prev_epoch: TracedEpoch, is_paused: bool) -> Self {
79        Self {
80            in_flight_prev_epoch,
81            pending_non_checkpoint_barriers: vec![],
82            is_paused,
83        }
84    }
85
86    pub fn is_paused(&self) -> bool {
87        self.is_paused
88    }
89
90    fn set_is_paused(&mut self, is_paused: bool) {
91        if self.is_paused != is_paused {
92            tracing::info!(
93                currently_paused = self.is_paused,
94                newly_paused = is_paused,
95                "update paused state"
96            );
97            self.is_paused = is_paused;
98        }
99    }
100
101    pub fn in_flight_prev_epoch(&self) -> &TracedEpoch {
102        &self.in_flight_prev_epoch
103    }
104
105    /// Returns the `BarrierInfo` for the next barrier, and updates the state.
106    pub fn next_barrier_info(
107        &mut self,
108        is_checkpoint: bool,
109        curr_epoch: TracedEpoch,
110    ) -> BarrierInfo {
111        assert!(
112            self.in_flight_prev_epoch.value() < curr_epoch.value(),
113            "curr epoch regress. {} > {}",
114            self.in_flight_prev_epoch.value(),
115            curr_epoch.value()
116        );
117        let prev_epoch = self.in_flight_prev_epoch.clone();
118        self.in_flight_prev_epoch = curr_epoch.clone();
119        self.pending_non_checkpoint_barriers
120            .push(prev_epoch.value().0);
121        let kind = if is_checkpoint {
122            let epochs = take(&mut self.pending_non_checkpoint_barriers);
123            BarrierKind::Checkpoint(epochs)
124        } else {
125            BarrierKind::Barrier
126        };
127        BarrierInfo {
128            prev_epoch,
129            curr_epoch,
130            kind,
131        }
132    }
133}
134
135pub(super) struct ApplyCommandInfo {
136    pub mv_subscription_max_retention: HashMap<TableId, u64>,
137    pub table_ids_to_commit: HashSet<TableId>,
138    pub jobs_to_wait: HashSet<JobId>,
139    pub command: PostCollectCommand,
140}
141
142/// Result tuple of `apply_command`: mutation, table IDs to commit, actors to create,
143/// node actors, and post-collect command.
144type ApplyCommandResult = (
145    Option<Mutation>,
146    HashSet<TableId>,
147    Option<StreamJobActorsToCreate>,
148    HashMap<WorkerId, HashSet<ActorId>>,
149    PostCollectCommand,
150);
151
152impl DatabaseCheckpointControl {
153    /// Collect table IDs to commit and actor IDs to collect from current fragment infos.
154    fn collect_base_info(&self) -> (HashSet<TableId>, HashMap<WorkerId, HashSet<ActorId>>) {
155        let table_ids_to_commit = self.database_info.existing_table_ids().collect();
156        let node_actors =
157            InflightFragmentInfo::actor_ids_to_collect(self.database_info.fragment_infos());
158        (table_ids_to_commit, node_actors)
159    }
160
161    /// Helper for the simplest command variants: those that only need a
162    /// pre-computed mutation and a command name, with no actors to create
163    /// and no additional side effects on `self`.
164    fn apply_simple_command(
165        &self,
166        mutation: Option<Mutation>,
167        command_name: &'static str,
168    ) -> ApplyCommandResult {
169        let (table_ids, node_actors) = self.collect_base_info();
170        (
171            mutation,
172            table_ids,
173            None,
174            node_actors,
175            PostCollectCommand::Command(command_name.to_owned()),
176        )
177    }
178
179    /// Returns the inflight actor infos that have included the newly added actors in the given command. The dropped actors
180    /// will be removed from the state after the info get resolved.
181    pub(super) fn apply_command(
182        &mut self,
183        command: Option<Command>,
184        notifiers: &mut Vec<Notifier>,
185        barrier_info: &BarrierInfo,
186        partial_graph_manager: &mut PartialGraphManager,
187        hummock_version_stats: &HummockVersionStats,
188    ) -> MetaResult<ApplyCommandInfo> {
189        debug_assert!(
190            !matches!(
191                command,
192                Some(Command::RescheduleIntent {
193                    reschedule_plan: None,
194                    ..
195                })
196            ),
197            "reschedule intent must be resolved before apply"
198        );
199        if matches!(
200            command,
201            Some(Command::RescheduleIntent {
202                reschedule_plan: None,
203                ..
204            })
205        ) {
206            bail!("reschedule intent must be resolved before apply");
207        }
208        // Throttle data for creating jobs (set only in the Throttle arm)
209        let mut throttle_for_creating_jobs: Option<(
210            HashSet<JobId>,
211            HashMap<FragmentId, ThrottleConfig>,
212        )> = None;
213
214        // Each variant handles its own pre-apply, edge building, mutation generation,
215        // collect base info, and post-apply. The match produces values consumed by the
216        // common snapshot-backfill-merging code that follows.
217        let (
218            mutation,
219            mut table_ids_to_commit,
220            mut actors_to_create,
221            mut node_actors,
222            post_collect_command,
223        ) = match command {
224            None => self.apply_simple_command(None, "barrier"),
225            Some(Command::CreateStreamingJob {
226                mut info,
227                job_type: CreateStreamingJobType::SnapshotBackfill(mut snapshot_backfill_info),
228                cross_db_snapshot_backfill_info,
229            }) => {
230                {
231                    assert!(!self.state.is_paused());
232                    let snapshot_epoch = barrier_info.prev_epoch();
233                    // set snapshot epoch of upstream table for snapshot backfill
234                    for snapshot_backfill_epoch in snapshot_backfill_info
235                        .upstream_mv_table_id_to_backfill_epoch
236                        .values_mut()
237                    {
238                        assert_eq!(
239                            snapshot_backfill_epoch.replace(snapshot_epoch),
240                            None,
241                            "must not set previously"
242                        );
243                    }
244                    for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
245                        fill_snapshot_backfill_epoch(
246                            &mut fragment.nodes,
247                            Some(&snapshot_backfill_info),
248                            &cross_db_snapshot_backfill_info,
249                        )?;
250                    }
251                    let job_id = info.stream_job_fragments.stream_job_id();
252                    let snapshot_backfill_upstream_tables = snapshot_backfill_info
253                        .upstream_mv_table_id_to_backfill_epoch
254                        .keys()
255                        .cloned()
256                        .collect();
257
258                    let mut edges = self.database_info.build_edge(
259                        Some((&info, true)),
260                        None,
261                        None,
262                        partial_graph_manager.control_stream_manager(),
263                    );
264
265                    let Entry::Vacant(entry) = self.creating_streaming_job_controls.entry(job_id)
266                    else {
267                        panic!("duplicated creating snapshot backfill job {job_id}");
268                    };
269
270                    let job = CreatingStreamingJobControl::new(
271                        entry,
272                        CreateSnapshotBackfillJobCommandInfo {
273                            info: info.clone(),
274                            snapshot_backfill_info: snapshot_backfill_info.clone(),
275                            cross_db_snapshot_backfill_info,
276                        },
277                        take(notifiers),
278                        snapshot_backfill_upstream_tables,
279                        snapshot_epoch,
280                        hummock_version_stats,
281                        partial_graph_manager,
282                        &mut edges,
283                    )?;
284
285                    self.database_info
286                        .shared_actor_infos
287                        .upsert(self.database_id, job.fragment_infos_with_job_id());
288
289                    for upstream_mv_table_id in snapshot_backfill_info
290                        .upstream_mv_table_id_to_backfill_epoch
291                        .keys()
292                    {
293                        self.database_info.register_subscriber(
294                            upstream_mv_table_id.as_job_id(),
295                            info.streaming_job.id().as_subscriber_id(),
296                            SubscriberType::SnapshotBackfill,
297                        );
298                    }
299
300                    let mutation = Command::create_streaming_job_to_mutation(
301                        &info,
302                        &CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info),
303                        self.state.is_paused(),
304                        &mut edges,
305                        partial_graph_manager.control_stream_manager(),
306                        None,
307                    )?;
308
309                    let (table_ids, node_actors) = self.collect_base_info();
310                    (
311                        Some(mutation),
312                        table_ids,
313                        None,
314                        node_actors,
315                        PostCollectCommand::barrier(),
316                    )
317                }
318            }
319            Some(Command::CreateStreamingJob {
320                mut info,
321                job_type,
322                cross_db_snapshot_backfill_info,
323            }) => {
324                for fragment in info.stream_job_fragments.inner.fragments.values_mut() {
325                    fill_snapshot_backfill_epoch(
326                        &mut fragment.nodes,
327                        None,
328                        &cross_db_snapshot_backfill_info,
329                    )?;
330                }
331
332                // Build edges
333                let new_upstream_sink =
334                    if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
335                        Some(ctx)
336                    } else {
337                        None
338                    };
339                let mut edges = self.database_info.build_edge(
340                    Some((&info, false)),
341                    None,
342                    new_upstream_sink,
343                    partial_graph_manager.control_stream_manager(),
344                );
345
346                // Pre-apply: add new job and fragments
347                let cdc_tracker = if let Some(splits) = &info.cdc_table_snapshot_splits {
348                    let (fragment, _) =
349                        parallel_cdc_table_backfill_fragment(info.stream_job_fragments.fragments())
350                            .expect("should have parallel cdc fragment");
351                    Some(CdcTableBackfillTracker::new(
352                        fragment.fragment_id,
353                        splits.clone(),
354                    ))
355                } else {
356                    None
357                };
358                self.database_info
359                    .pre_apply_new_job(info.streaming_job.id(), cdc_tracker);
360                self.database_info.pre_apply_new_fragments(
361                    info.stream_job_fragments
362                        .new_fragment_info(&info.init_split_assignment)
363                        .map(|(fragment_id, fragment_infos)| {
364                            (fragment_id, info.streaming_job.id(), fragment_infos)
365                        }),
366                );
367                if let CreateStreamingJobType::SinkIntoTable(ref ctx) = job_type {
368                    let downstream_fragment_id = ctx.new_sink_downstream.downstream_fragment_id;
369                    self.database_info.pre_apply_add_node_upstream(
370                        downstream_fragment_id,
371                        &PbUpstreamSinkInfo {
372                            upstream_fragment_id: ctx.sink_fragment_id,
373                            sink_output_schema: ctx.sink_output_fields.clone(),
374                            project_exprs: ctx.project_exprs.clone(),
375                        },
376                    );
377                }
378
379                let (table_ids, node_actors) = self.collect_base_info();
380
381                // Actors to create
382                let actors_to_create = Some(Command::create_streaming_job_actors_to_create(
383                    &info, &mut edges,
384                ));
385
386                // CDC table snapshot splits
387                let actor_cdc_table_snapshot_splits = self
388                    .database_info
389                    .assign_cdc_backfill_splits(info.stream_job_fragments.stream_job_id())?;
390
391                // Mutation
392                let is_currently_paused = self.state.is_paused();
393                let mutation = Command::create_streaming_job_to_mutation(
394                    &info,
395                    &job_type,
396                    is_currently_paused,
397                    &mut edges,
398                    partial_graph_manager.control_stream_manager(),
399                    actor_cdc_table_snapshot_splits,
400                )?;
401
402                (
403                    Some(mutation),
404                    table_ids,
405                    actors_to_create,
406                    node_actors,
407                    PostCollectCommand::CreateStreamingJob {
408                        info,
409                        job_type,
410                        cross_db_snapshot_backfill_info,
411                    },
412                )
413            }
414
415            Some(Command::Flush) => self.apply_simple_command(None, "Flush"),
416
417            Some(Command::Pause) => {
418                let prev_is_paused = self.state.is_paused();
419                self.state.set_is_paused(true);
420                let mutation = Command::pause_to_mutation(prev_is_paused);
421                let (table_ids, node_actors) = self.collect_base_info();
422                (
423                    mutation,
424                    table_ids,
425                    None,
426                    node_actors,
427                    PostCollectCommand::Command("Pause".to_owned()),
428                )
429            }
430
431            Some(Command::Resume) => {
432                let prev_is_paused = self.state.is_paused();
433                self.state.set_is_paused(false);
434                let mutation = Command::resume_to_mutation(prev_is_paused);
435                let (table_ids, node_actors) = self.collect_base_info();
436                (
437                    mutation,
438                    table_ids,
439                    None,
440                    node_actors,
441                    PostCollectCommand::Command("Resume".to_owned()),
442                )
443            }
444
445            Some(Command::Throttle { jobs, config }) => {
446                let mutation = Some(Command::throttle_to_mutation(&config));
447                throttle_for_creating_jobs = Some((jobs, config));
448                self.apply_simple_command(mutation, "Throttle")
449            }
450
451            Some(Command::DropStreamingJobs {
452                streaming_job_ids,
453                actors,
454                unregistered_state_table_ids,
455                unregistered_fragment_ids,
456                dropped_sink_fragment_by_targets,
457            }) => {
458                // pre_apply: drop node upstream for sink targets
459                for (target_fragment, sink_fragments) in &dropped_sink_fragment_by_targets {
460                    self.database_info
461                        .pre_apply_drop_node_upstream(*target_fragment, sink_fragments);
462                }
463
464                let (table_ids, node_actors) = self.collect_base_info();
465
466                // post_apply: remove fragments
467                self.database_info
468                    .post_apply_remove_fragments(unregistered_fragment_ids.iter().cloned());
469
470                let mutation = Some(Command::drop_streaming_jobs_to_mutation(
471                    &actors,
472                    &dropped_sink_fragment_by_targets,
473                ));
474                (
475                    mutation,
476                    table_ids,
477                    None,
478                    node_actors,
479                    PostCollectCommand::DropStreamingJobs {
480                        streaming_job_ids,
481                        unregistered_state_table_ids,
482                    },
483                )
484            }
485
486            Some(Command::RescheduleIntent {
487                reschedule_plan, ..
488            }) => {
489                let ReschedulePlan {
490                    reschedules,
491                    fragment_actors,
492                } = reschedule_plan
493                    .as_ref()
494                    .expect("reschedule intent should be resolved in global barrier worker");
495
496                // Pre-apply: reschedule fragments
497                for (fragment_id, reschedule) in reschedules {
498                    self.database_info.pre_apply_reschedule(
499                        *fragment_id,
500                        reschedule
501                            .added_actors
502                            .iter()
503                            .flat_map(|(node_id, actors): (&WorkerId, &Vec<ActorId>)| {
504                                actors.iter().map(|actor_id| {
505                                    (
506                                        *actor_id,
507                                        InflightActorInfo {
508                                            worker_id: *node_id,
509                                            vnode_bitmap: reschedule
510                                                .newly_created_actors
511                                                .get(actor_id)
512                                                .expect("should exist")
513                                                .0
514                                                .0
515                                                .vnode_bitmap
516                                                .clone(),
517                                            splits: reschedule
518                                                .actor_splits
519                                                .get(actor_id)
520                                                .cloned()
521                                                .unwrap_or_default(),
522                                        },
523                                    )
524                                })
525                            })
526                            .collect(),
527                        reschedule
528                            .vnode_bitmap_updates
529                            .iter()
530                            .filter(|(actor_id, _)| {
531                                !reschedule.newly_created_actors.contains_key(*actor_id)
532                            })
533                            .map(|(actor_id, bitmap)| (*actor_id, bitmap.clone()))
534                            .collect(),
535                        reschedule.actor_splits.clone(),
536                    );
537                }
538
539                let (table_ids, node_actors) = self.collect_base_info();
540
541                // Actors to create
542                let actors_to_create = Some(Command::reschedule_actors_to_create(
543                    reschedules,
544                    fragment_actors,
545                    &self.database_info,
546                    partial_graph_manager.control_stream_manager(),
547                ));
548
549                // Post-apply: remove old actors
550                self.database_info
551                    .post_apply_reschedules(reschedules.iter().map(|(fragment_id, reschedule)| {
552                        (
553                            *fragment_id,
554                            reschedule.removed_actors.iter().cloned().collect(),
555                        )
556                    }));
557
558                // Mutation
559                let mutation = Command::reschedule_to_mutation(
560                    reschedules,
561                    fragment_actors,
562                    partial_graph_manager.control_stream_manager(),
563                    &mut self.database_info,
564                )?;
565
566                let reschedules = reschedule_plan
567                    .expect("reschedule intent should be resolved in global barrier worker")
568                    .reschedules;
569                (
570                    mutation,
571                    table_ids,
572                    actors_to_create,
573                    node_actors,
574                    PostCollectCommand::Reschedule { reschedules },
575                )
576            }
577
578            Some(Command::ReplaceStreamJob(plan)) => {
579                // Build edges
580                let mut edges = self.database_info.build_edge(
581                    None,
582                    Some(&plan),
583                    None,
584                    partial_graph_manager.control_stream_manager(),
585                );
586
587                // Pre-apply: add new fragments and replace upstream
588                self.database_info.pre_apply_new_fragments(
589                    plan.new_fragments
590                        .new_fragment_info(&plan.init_split_assignment)
591                        .map(|(fragment_id, new_fragment)| {
592                            (fragment_id, plan.streaming_job.id(), new_fragment)
593                        }),
594                );
595                for (fragment_id, replace_map) in &plan.replace_upstream {
596                    self.database_info
597                        .pre_apply_replace_node_upstream(*fragment_id, replace_map);
598                }
599                if let Some(sinks) = &plan.auto_refresh_schema_sinks {
600                    self.database_info
601                        .pre_apply_new_fragments(sinks.iter().map(|sink| {
602                            (
603                                sink.new_fragment.fragment_id,
604                                sink.original_sink.id.as_job_id(),
605                                sink.new_fragment_info(),
606                            )
607                        }));
608                }
609
610                let (table_ids, node_actors) = self.collect_base_info();
611
612                // Actors to create
613                let actors_to_create = Some(Command::replace_stream_job_actors_to_create(
614                    &plan,
615                    &mut edges,
616                    &self.database_info,
617                ));
618
619                // Post-apply: remove old fragments
620                {
621                    let mut fragment_ids_to_remove: Vec<_> = plan
622                        .old_fragments
623                        .fragments
624                        .values()
625                        .map(|f| f.fragment_id)
626                        .collect();
627                    if let Some(sinks) = &plan.auto_refresh_schema_sinks {
628                        fragment_ids_to_remove
629                            .extend(sinks.iter().map(|sink| sink.original_fragment.fragment_id));
630                    }
631                    self.database_info
632                        .post_apply_remove_fragments(fragment_ids_to_remove);
633                }
634
635                // Mutation
636                let mutation = Command::replace_stream_job_to_mutation(
637                    &plan,
638                    &mut edges,
639                    &mut self.database_info,
640                )?;
641
642                (
643                    mutation,
644                    table_ids,
645                    actors_to_create,
646                    node_actors,
647                    PostCollectCommand::ReplaceStreamJob(plan),
648                )
649            }
650
651            Some(Command::SourceChangeSplit(split_state)) => {
652                // Pre-apply: split assignments
653                self.database_info.pre_apply_split_assignments(
654                    split_state
655                        .split_assignment
656                        .iter()
657                        .map(|(&fragment_id, splits)| (fragment_id, splits.clone())),
658                );
659
660                let mutation = Some(Command::source_change_split_to_mutation(
661                    &split_state.split_assignment,
662                ));
663                let (table_ids, node_actors) = self.collect_base_info();
664                (
665                    mutation,
666                    table_ids,
667                    None,
668                    node_actors,
669                    PostCollectCommand::SourceChangeSplit {
670                        split_assignment: split_state.split_assignment,
671                    },
672                )
673            }
674
675            Some(Command::CreateSubscription {
676                subscription_id,
677                upstream_mv_table_id,
678                retention_second,
679            }) => {
680                self.database_info.register_subscriber(
681                    upstream_mv_table_id.as_job_id(),
682                    subscription_id.as_subscriber_id(),
683                    SubscriberType::Subscription(retention_second),
684                );
685                let mutation = Some(Command::create_subscription_to_mutation(
686                    upstream_mv_table_id,
687                    subscription_id,
688                ));
689                let (table_ids, node_actors) = self.collect_base_info();
690                (
691                    mutation,
692                    table_ids,
693                    None,
694                    node_actors,
695                    PostCollectCommand::CreateSubscription { subscription_id },
696                )
697            }
698
699            Some(Command::DropSubscription {
700                subscription_id,
701                upstream_mv_table_id,
702            }) => {
703                if self
704                    .database_info
705                    .unregister_subscriber(
706                        upstream_mv_table_id.as_job_id(),
707                        subscription_id.as_subscriber_id(),
708                    )
709                    .is_none()
710                {
711                    warn!(%subscription_id, %upstream_mv_table_id, "no subscription to drop");
712                }
713                let mutation = Some(Command::drop_subscription_to_mutation(
714                    upstream_mv_table_id,
715                    subscription_id,
716                ));
717                let (table_ids, node_actors) = self.collect_base_info();
718                (
719                    mutation,
720                    table_ids,
721                    None,
722                    node_actors,
723                    PostCollectCommand::Command("DropSubscription".to_owned()),
724                )
725            }
726
727            Some(Command::AlterSubscriptionRetention {
728                subscription_id,
729                upstream_mv_table_id,
730                retention_second,
731            }) => {
732                self.database_info.update_subscription_retention(
733                    upstream_mv_table_id.as_job_id(),
734                    subscription_id.as_subscriber_id(),
735                    retention_second,
736                );
737                self.apply_simple_command(None, "AlterSubscriptionRetention")
738            }
739
740            Some(Command::ConnectorPropsChange(config)) => {
741                let mutation = Some(Command::connector_props_change_to_mutation(&config));
742                let (table_ids, node_actors) = self.collect_base_info();
743                (
744                    mutation,
745                    table_ids,
746                    None,
747                    node_actors,
748                    PostCollectCommand::ConnectorPropsChange(config),
749                )
750            }
751
752            Some(Command::Refresh {
753                table_id,
754                associated_source_id,
755            }) => {
756                let mutation = Some(Command::refresh_to_mutation(table_id, associated_source_id));
757                self.apply_simple_command(mutation, "Refresh")
758            }
759
760            Some(Command::ListFinish {
761                table_id: _,
762                associated_source_id,
763            }) => {
764                let mutation = Some(Command::list_finish_to_mutation(associated_source_id));
765                self.apply_simple_command(mutation, "ListFinish")
766            }
767
768            Some(Command::LoadFinish {
769                table_id: _,
770                associated_source_id,
771            }) => {
772                let mutation = Some(Command::load_finish_to_mutation(associated_source_id));
773                self.apply_simple_command(mutation, "LoadFinish")
774            }
775
776            Some(Command::ResetSource { source_id }) => {
777                let mutation = Some(Command::reset_source_to_mutation(source_id));
778                self.apply_simple_command(mutation, "ResetSource")
779            }
780
781            Some(Command::ResumeBackfill { target }) => {
782                let mutation = Command::resume_backfill_to_mutation(&target, &self.database_info)?;
783                let (table_ids, node_actors) = self.collect_base_info();
784                (
785                    mutation,
786                    table_ids,
787                    None,
788                    node_actors,
789                    PostCollectCommand::ResumeBackfill { target },
790                )
791            }
792
793            Some(Command::InjectSourceOffsets {
794                source_id,
795                split_offsets,
796            }) => {
797                let mutation = Some(Command::inject_source_offsets_to_mutation(
798                    source_id,
799                    &split_offsets,
800                ));
801                self.apply_simple_command(mutation, "InjectSourceOffsets")
802            }
803        };
804
805        let mut finished_snapshot_backfill_jobs = HashSet::new();
806        let mutation = match mutation {
807            Some(mutation) => Some(mutation),
808            None => {
809                let mut finished_snapshot_backfill_job_info = HashMap::new();
810                if barrier_info.kind.is_checkpoint() {
811                    for (&job_id, creating_job) in &mut self.creating_streaming_job_controls {
812                        if creating_job.should_merge_to_upstream() {
813                            let info = creating_job
814                                .start_consume_upstream(partial_graph_manager, barrier_info)?;
815                            finished_snapshot_backfill_job_info
816                                .try_insert(job_id, info)
817                                .expect("non-duplicated");
818                        }
819                    }
820                }
821
822                if !finished_snapshot_backfill_job_info.is_empty() {
823                    let actors_to_create = actors_to_create.get_or_insert_default();
824                    let mut subscriptions_to_drop = vec![];
825                    let mut dispatcher_update = vec![];
826                    let mut actor_splits = HashMap::new();
827                    for (job_id, info) in finished_snapshot_backfill_job_info {
828                        finished_snapshot_backfill_jobs.insert(job_id);
829                        subscriptions_to_drop.extend(
830                            info.snapshot_backfill_upstream_tables.iter().map(
831                                |upstream_table_id| PbSubscriptionUpstreamInfo {
832                                    subscriber_id: job_id.as_subscriber_id(),
833                                    upstream_mv_table_id: *upstream_table_id,
834                                },
835                            ),
836                        );
837                        for upstream_mv_table_id in &info.snapshot_backfill_upstream_tables {
838                            assert_matches!(
839                                self.database_info.unregister_subscriber(
840                                    upstream_mv_table_id.as_job_id(),
841                                    job_id.as_subscriber_id()
842                                ),
843                                Some(SubscriberType::SnapshotBackfill)
844                            );
845                        }
846
847                        table_ids_to_commit.extend(
848                            info.fragment_infos
849                                .values()
850                                .flat_map(|fragment| fragment.state_table_ids.iter())
851                                .copied(),
852                        );
853
854                        let actor_len = info
855                            .fragment_infos
856                            .values()
857                            .map(|fragment| fragment.actors.len() as u64)
858                            .sum();
859                        let id_gen = GlobalActorIdGen::new(
860                            partial_graph_manager
861                                .control_stream_manager()
862                                .env
863                                .actor_id_generator(),
864                            actor_len,
865                        );
866                        let mut next_local_actor_id = 0;
867                        // mapping from old_actor_id to new_actor_id
868                        let actor_mapping: HashMap<_, _> = info
869                            .fragment_infos
870                            .values()
871                            .flat_map(|fragment| fragment.actors.keys())
872                            .map(|old_actor_id| {
873                                let new_actor_id = id_gen.to_global_id(next_local_actor_id);
874                                next_local_actor_id += 1;
875                                (*old_actor_id, new_actor_id.as_global_id())
876                            })
877                            .collect();
878                        let actor_mapping = &actor_mapping;
879                        let new_stream_actors: HashMap<_, _> = info
880                            .stream_actors
881                            .into_iter()
882                            .map(|(old_actor_id, mut actor)| {
883                                let new_actor_id = actor_mapping[&old_actor_id];
884                                actor.actor_id = new_actor_id;
885                                (new_actor_id, actor)
886                            })
887                            .collect();
888                        let new_fragment_info: HashMap<_, _> = info
889                            .fragment_infos
890                            .into_iter()
891                            .map(|(fragment_id, mut fragment)| {
892                                let actors = take(&mut fragment.actors);
893                                fragment.actors = actors
894                                    .into_iter()
895                                    .map(|(old_actor_id, actor)| {
896                                        let new_actor_id = actor_mapping[&old_actor_id];
897                                        (new_actor_id, actor)
898                                    })
899                                    .collect();
900                                (fragment_id, fragment)
901                            })
902                            .collect();
903                        actor_splits.extend(
904                            new_fragment_info
905                                .values()
906                                .flat_map(|fragment| &fragment.actors)
907                                .map(|(actor_id, actor)| {
908                                    (
909                                        *actor_id,
910                                        ConnectorSplits {
911                                            splits: actor
912                                                .splits
913                                                .iter()
914                                                .map(ConnectorSplit::from)
915                                                .collect(),
916                                        },
917                                    )
918                                }),
919                        );
920                        // new actors belong to the database partial graph
921                        let partial_graph_id = to_partial_graph_id(self.database_id, None);
922                        let mut edge_builder = FragmentEdgeBuilder::new(
923                            info.upstream_fragment_downstreams
924                                .keys()
925                                .map(|upstream_fragment_id| {
926                                    self.database_info.fragment(*upstream_fragment_id)
927                                })
928                                .chain(new_fragment_info.values())
929                                .map(|info| (info, partial_graph_id)),
930                            partial_graph_manager.control_stream_manager(),
931                        );
932                        edge_builder.add_relations(&info.upstream_fragment_downstreams);
933                        edge_builder.add_relations(&info.downstreams);
934                        let mut edges = edge_builder.build();
935                        let new_actors_to_create = edges.collect_actors_to_create(
936                            new_fragment_info.values().map(|fragment| {
937                                (
938                                    fragment.fragment_id,
939                                    &fragment.nodes,
940                                    fragment.actors.iter().map(|(actor_id, actor)| {
941                                        (&new_stream_actors[actor_id], actor.worker_id)
942                                    }),
943                                    [], // no initial subscriber for backfilling job
944                                )
945                            }),
946                        );
947                        dispatcher_update.extend(
948                            info.upstream_fragment_downstreams.keys().flat_map(
949                                |upstream_fragment_id| {
950                                    let new_actor_dispatchers = edges
951                                        .dispatchers
952                                        .remove(upstream_fragment_id)
953                                        .expect("should exist");
954                                    new_actor_dispatchers.into_iter().flat_map(
955                                        |(upstream_actor_id, dispatchers)| {
956                                            dispatchers.into_iter().map(move |dispatcher| {
957                                                PbDispatcherUpdate {
958                                                    actor_id: upstream_actor_id,
959                                                    dispatcher_id: dispatcher.dispatcher_id,
960                                                    hash_mapping: dispatcher.hash_mapping,
961                                                    removed_downstream_actor_id: dispatcher
962                                                        .downstream_actor_id
963                                                        .iter()
964                                                        .map(|new_downstream_actor_id| {
965                                                            actor_mapping
966                                                            .iter()
967                                                            .find_map(
968                                                                |(old_actor_id, new_actor_id)| {
969                                                                    (new_downstream_actor_id
970                                                                        == new_actor_id)
971                                                                        .then_some(*old_actor_id)
972                                                                },
973                                                            )
974                                                            .expect("should exist")
975                                                        })
976                                                        .collect(),
977                                                    added_downstream_actor_id: dispatcher
978                                                        .downstream_actor_id,
979                                                }
980                                            })
981                                        },
982                                    )
983                                },
984                            ),
985                        );
986                        assert!(edges.is_empty(), "remaining edges: {:?}", edges);
987                        for (worker_id, worker_actors) in new_actors_to_create {
988                            node_actors.entry(worker_id).or_default().extend(
989                                worker_actors.values().flat_map(|(_, actors, _)| {
990                                    actors.iter().map(|(actor, _, _)| actor.actor_id)
991                                }),
992                            );
993                            actors_to_create
994                                .entry(worker_id)
995                                .or_default()
996                                .extend(worker_actors);
997                        }
998                        self.database_info.add_existing(InflightStreamingJobInfo {
999                            job_id,
1000                            fragment_infos: new_fragment_info,
1001                            subscribers: Default::default(), // no initial subscribers for newly created snapshot backfill
1002                            status: CreateStreamingJobStatus::Created,
1003                            cdc_table_backfill_tracker: None, // no cdc table backfill for snapshot backfill
1004                        });
1005                    }
1006
1007                    Some(PbMutation::Update(PbUpdateMutation {
1008                        dispatcher_update,
1009                        merge_update: vec![], // no upstream update on existing actors
1010                        actor_vnode_bitmap_update: Default::default(), /* no in place update vnode bitmap happened */
1011                        dropped_actors: vec![], /* no actors to drop in the partial graph of database */
1012                        actor_splits,
1013                        actor_new_dispatchers: Default::default(), // no new dispatcher
1014                        actor_cdc_table_snapshot_splits: None, /* no cdc table backfill in snapshot backfill */
1015                        sink_schema_change: Default::default(), /* no sink auto schema change happened here */
1016                        subscriptions_to_drop,
1017                    }))
1018                } else {
1019                    let fragment_ids = self.database_info.take_pending_backfill_nodes();
1020                    if fragment_ids.is_empty() {
1021                        None
1022                    } else {
1023                        Some(PbMutation::StartFragmentBackfill(
1024                            PbStartFragmentBackfillMutation { fragment_ids },
1025                        ))
1026                    }
1027                }
1028            }
1029        };
1030
1031        // Forward barrier to creating streaming job controls
1032        for (job_id, creating_job) in &mut self.creating_streaming_job_controls {
1033            if !finished_snapshot_backfill_jobs.contains(job_id) {
1034                let throttle_mutation = if let Some((ref jobs, ref config)) =
1035                    throttle_for_creating_jobs
1036                    && jobs.contains(job_id)
1037                {
1038                    assert_eq!(
1039                        jobs.len(),
1040                        1,
1041                        "should not alter rate limit of snapshot backfill job with other jobs"
1042                    );
1043                    Some((
1044                        Mutation::Throttle(ThrottleMutation {
1045                            fragment_throttle: config
1046                                .iter()
1047                                .map(|(fragment_id, config)| (*fragment_id, *config))
1048                                .collect(),
1049                        }),
1050                        take(notifiers),
1051                    ))
1052                } else {
1053                    None
1054                };
1055                creating_job.on_new_upstream_barrier(
1056                    partial_graph_manager,
1057                    barrier_info,
1058                    throttle_mutation,
1059                )?;
1060            }
1061        }
1062
1063        partial_graph_manager.inject_barrier(
1064            to_partial_graph_id(self.database_id, None),
1065            mutation,
1066            barrier_info,
1067            &node_actors,
1068            InflightFragmentInfo::existing_table_ids(self.database_info.fragment_infos()),
1069            InflightFragmentInfo::workers(self.database_info.fragment_infos()),
1070            actors_to_create,
1071        )?;
1072
1073        Ok(ApplyCommandInfo {
1074            mv_subscription_max_retention: self.database_info.max_subscription_retention(),
1075            table_ids_to_commit,
1076            jobs_to_wait: finished_snapshot_backfill_jobs,
1077            command: post_collect_command,
1078        })
1079    }
1080}