risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
41use crate::stream::{SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
45    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46        self.metadata_manager
47            .catalog_controller
48            .clean_dirty_subscription(database_id)
49            .await?;
50        let dirty_associated_source_ids = self
51            .metadata_manager
52            .catalog_controller
53            .clean_dirty_creating_jobs(database_id)
54            .await?;
55        self.metadata_manager
56            .reset_all_refresh_jobs_to_idle()
57            .await?;
58
59        // unregister cleaned sources.
60        self.source_manager
61            .apply_source_change(SourceChange::DropSource {
62                dropped_source_ids: dirty_associated_source_ids,
63            })
64            .await;
65
66        Ok(())
67    }
68
69    async fn purge_state_table_from_hummock(
70        &self,
71        all_state_table_ids: &HashSet<TableId>,
72    ) -> MetaResult<()> {
73        self.hummock_manager.purge(all_state_table_ids).await?;
74        Ok(())
75    }
76
77    async fn list_background_job_progress(
78        &self,
79        database_id: Option<DatabaseId>,
80    ) -> MetaResult<HashMap<JobId, String>> {
81        let mgr = &self.metadata_manager;
82        let job_info = mgr
83            .catalog_controller
84            .list_background_creating_jobs(false, database_id)
85            .await?;
86
87        Ok(job_info
88            .into_iter()
89            .map(|(job_id, definition, _init_at)| (job_id, definition))
90            .collect())
91    }
92
93    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
94    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
95    /// will create or drop before this barrier flow through them.
96    async fn resolve_database_info(
97        &self,
98        database_id: Option<DatabaseId>,
99        worker_nodes: &ActiveStreamingWorkerNodes,
100    ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
101    {
102        let all_actor_infos = self
103            .metadata_manager
104            .catalog_controller
105            .load_all_actors_dynamic(database_id, worker_nodes)
106            .await?;
107
108        Ok(all_actor_infos
109            .into_iter()
110            .map(|(loaded_database_id, job_fragment_infos)| {
111                if let Some(database_id) = database_id {
112                    assert_eq!(database_id, loaded_database_id);
113                }
114                (
115                    loaded_database_id,
116                    job_fragment_infos
117                        .into_iter()
118                        .map(|(job_id, fragment_infos)| {
119                            (
120                                job_id,
121                                fragment_infos
122                                    .into_iter()
123                                    .map(|(fragment_id, info)| (fragment_id as _, info))
124                                    .collect(),
125                            )
126                        })
127                        .collect(),
128                )
129            })
130            .collect())
131    }
132
133    #[expect(clippy::type_complexity)]
134    fn resolve_hummock_version_epochs(
135        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
136        version: &HummockVersion,
137    ) -> MetaResult<(
138        HashMap<TableId, u64>,
139        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
140    )> {
141        let table_committed_epoch: HashMap<_, _> = version
142            .state_table_info
143            .info()
144            .iter()
145            .map(|(table_id, info)| (*table_id, info.committed_epoch))
146            .collect();
147        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
148            Ok(*table_committed_epoch
149                .get(&table_id)
150                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
151        };
152        let mut min_downstream_committed_epochs = HashMap::new();
153        for (job_id, fragments) in background_jobs {
154            let job_committed_epoch = {
155                let mut table_id_iter =
156                    InflightFragmentInfo::existing_table_ids(fragments.values());
157                let Some(first_table_id) = table_id_iter.next() else {
158                    bail!("job {} has no state table", job_id);
159                };
160                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
161                for table_id in table_id_iter {
162                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
163                    if job_committed_epoch != table_committed_epoch {
164                        bail!(
165                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
166                            first_table_id,
167                            job_committed_epoch,
168                            table_id,
169                            table_committed_epoch,
170                            job_id
171                        );
172                    }
173                }
174
175                job_committed_epoch
176            };
177            if let (Some(snapshot_backfill_info), _) =
178                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
179                    fragments
180                        .values()
181                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
182                )?
183            {
184                for (upstream_table, snapshot_epoch) in
185                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
186                {
187                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
188                        anyhow!(
189                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
190                            job_id, upstream_table
191                        )
192                    })?;
193                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
194                    match min_downstream_committed_epochs.entry(upstream_table) {
195                        Entry::Occupied(entry) => {
196                            let prev_min_epoch = entry.into_mut();
197                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
198                        }
199                        Entry::Vacant(entry) => {
200                            entry.insert(pinned_epoch);
201                        }
202                    }
203                }
204            }
205        }
206        let mut log_epochs = HashMap::new();
207        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
208            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
209            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
210                Ordering::Less => {
211                    bail!(
212                        "downstream epoch {} later than upstream epoch {} of table {}",
213                        downstream_committed_epoch,
214                        upstream_committed_epoch,
215                        upstream_table_id
216                    );
217                }
218                Ordering::Equal => {
219                    continue;
220                }
221                Ordering::Greater => {
222                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
223                    {
224                        let epochs = table_change_log
225                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
226                            .map(|epoch_log| {
227                                (
228                                    epoch_log.non_checkpoint_epochs.clone(),
229                                    epoch_log.checkpoint_epoch,
230                                )
231                            })
232                            .collect_vec();
233                        let first_epochs = epochs.first();
234                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
235                            && *first_checkpoint_epoch == downstream_committed_epoch
236                        {
237                        } else {
238                            bail!(
239                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
240                                epochs,
241                                upstream_table_id,
242                                downstream_committed_epoch
243                            );
244                        }
245                        log_epochs
246                            .try_insert(upstream_table_id, epochs)
247                            .expect("non-duplicated");
248                    } else {
249                        bail!(
250                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
251                            upstream_table_id,
252                            upstream_committed_epoch,
253                            downstream_committed_epoch
254                        );
255                    }
256                }
257            }
258        }
259        Ok((table_committed_epoch, log_epochs))
260    }
261
262    fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
263    where
264        I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
265    {
266        let mut cdc_table_backfill_actors = HashMap::new();
267
268        for (job_id, fragments) in jobs {
269            for fragment_infos in fragments.values() {
270                if fragment_infos
271                    .fragment_type_mask
272                    .contains(FragmentTypeFlag::StreamCdcScan)
273                {
274                    cdc_table_backfill_actors
275                        .entry(*job_id)
276                        .or_insert_with(HashSet::new)
277                        .extend(fragment_infos.actors.keys().cloned());
278                }
279            }
280        }
281
282        cdc_table_backfill_actors
283    }
284
285    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
286    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
287    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
288    /// the operator.
289    async fn recovery_table_with_upstream_sinks(
290        &self,
291        inflight_jobs: &mut HashMap<
292            DatabaseId,
293            HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
294        >,
295    ) -> MetaResult<()> {
296        let mut jobs = inflight_jobs.values_mut().try_fold(
297            HashMap::new(),
298            |mut acc, table_map| -> MetaResult<_> {
299                for (job_id, job) in table_map {
300                    if acc.insert(*job_id, job).is_some() {
301                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
302                    }
303                }
304                Ok(acc)
305            },
306        )?;
307        // Only `Table` will be returned here, ignoring other catalog objects.
308        let tables = self
309            .metadata_manager
310            .catalog_controller
311            .get_user_created_table_by_ids(jobs.keys().copied())
312            .await?;
313        for table in tables {
314            assert_eq!(table.table_type(), PbTableType::Table);
315            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
316            let mut target_fragment_id = None;
317            for fragment in fragment_infos.values() {
318                let mut is_target_fragment = false;
319                visit_stream_node_cont(&fragment.nodes, |node| {
320                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
321                        is_target_fragment = true;
322                        false
323                    } else {
324                        true
325                    }
326                });
327                if is_target_fragment {
328                    target_fragment_id = Some(fragment.fragment_id);
329                    break;
330                }
331            }
332            let Some(target_fragment_id) = target_fragment_id else {
333                tracing::debug!(
334                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
335                    table.id
336                );
337                continue;
338            };
339            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
340            let upstream_infos = self
341                .metadata_manager
342                .catalog_controller
343                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
344                .await?;
345            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
346        }
347
348        Ok(())
349    }
350
351    pub(super) async fn reload_runtime_info_impl(
352        &self,
353    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
354        {
355            {
356                {
357                    self.clean_dirty_streaming_jobs(None)
358                        .await
359                        .context("clean dirty streaming jobs")?;
360
361                    // Background job progress needs to be recovered.
362                    tracing::info!("recovering background job progress");
363                    let background_jobs = self
364                        .list_background_job_progress(None)
365                        .await
366                        .context("recover background job progress should not fail")?;
367
368                    tracing::info!("recovered background job progress");
369
370                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
371                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
372                    self.metadata_manager
373                        .catalog_controller
374                        .cleanup_dropped_tables()
375                        .await;
376
377                    let active_streaming_nodes =
378                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
379                            .await?;
380
381                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
382
383                    tracing::info!(
384                        "background streaming jobs: {:?} total {}",
385                        background_streaming_jobs,
386                        background_streaming_jobs.len()
387                    );
388
389                    let unreschedulable_jobs = {
390                        let mut unreschedulable_jobs = HashSet::new();
391
392                        for job_id in background_streaming_jobs {
393                            let scan_types = self
394                                .metadata_manager
395                                .get_job_backfill_scan_types(job_id)
396                                .await?;
397
398                            if scan_types
399                                .values()
400                                .any(|scan_type| !scan_type.is_reschedulable())
401                            {
402                                unreschedulable_jobs.insert(job_id);
403                            }
404                        }
405
406                        unreschedulable_jobs
407                    };
408
409                    if !unreschedulable_jobs.is_empty() {
410                        tracing::info!(
411                            "unreschedulable background jobs: {:?}",
412                            unreschedulable_jobs
413                        );
414                    }
415
416                    // Resolve actor info for recovery. If there's no actor to recover, most of the
417                    // following steps will be no-op, while the compute nodes will still be reset.
418                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
419                    let mut info = if unreschedulable_jobs.is_empty() {
420                        info!("trigger offline scaling");
421                        self.resolve_database_info(None, &active_streaming_nodes)
422                            .await
423                            .inspect_err(|err| {
424                                warn!(error = %err.as_report(), "resolve actor info failed");
425                            })?
426                    } else {
427                        bail!(
428                            "Recovery for unreschedulable background jobs is not yet implemented. \
429                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
430                            unreschedulable_jobs
431                        );
432                    };
433
434                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
435                    if !dropped_table_ids.is_empty() {
436                        self.metadata_manager
437                            .catalog_controller
438                            .complete_dropped_tables(dropped_table_ids)
439                            .await;
440                        info = self
441                            .resolve_database_info(None, &active_streaming_nodes)
442                            .await
443                            .inspect_err(|err| {
444                                warn!(error = %err.as_report(), "resolve actor info failed");
445                            })?
446                    }
447
448                    self.recovery_table_with_upstream_sinks(&mut info).await?;
449
450                    let info = info;
451
452                    self.purge_state_table_from_hummock(
453                        &info
454                            .values()
455                            .flatten()
456                            .flat_map(|(_, fragments)| {
457                                InflightFragmentInfo::existing_table_ids(fragments.values())
458                            })
459                            .collect(),
460                    )
461                    .await
462                    .context("purge state table from hummock")?;
463
464                    let (state_table_committed_epochs, state_table_log_epochs) = self
465                        .hummock_manager
466                        .on_current_version(|version| {
467                            Self::resolve_hummock_version_epochs(
468                                info.values().flat_map(|jobs| {
469                                    jobs.iter().filter_map(|(job_id, job)| {
470                                        background_jobs
471                                            .contains_key(job_id)
472                                            .then_some((*job_id, job))
473                                    })
474                                }),
475                                version,
476                            )
477                        })
478                        .await?;
479
480                    let mv_depended_subscriptions = self
481                        .metadata_manager
482                        .get_mv_depended_subscriptions(None)
483                        .await?;
484
485                    let stream_actors = self.load_stream_actors(&info).await?;
486
487                    let fragment_relations = self
488                        .metadata_manager
489                        .catalog_controller
490                        .get_fragment_downstream_relations(
491                            info.values()
492                                .flatten()
493                                .flat_map(|(_, job)| job.keys())
494                                .map(|fragment_id| *fragment_id as _)
495                                .collect(),
496                        )
497                        .await?;
498
499                    let background_jobs = {
500                        let mut background_jobs = self
501                            .list_background_job_progress(None)
502                            .await
503                            .context("recover background job progress should not fail")?;
504                        info.values()
505                            .flatten()
506                            .filter_map(|(job_id, _)| {
507                                background_jobs
508                                    .remove(job_id)
509                                    .map(|definition| (*job_id, definition))
510                            })
511                            .collect()
512                    };
513
514                    let database_infos = self
515                        .metadata_manager
516                        .catalog_controller
517                        .list_databases()
518                        .await?;
519
520                    // get split assignments for all actors
521                    let mut source_splits = HashMap::new();
522                    for (_, fragment_infos) in info.values().flatten() {
523                        for fragment in fragment_infos.values() {
524                            for (actor_id, info) in &fragment.actors {
525                                source_splits.insert(*actor_id, info.splits.clone());
526                            }
527                        }
528                    }
529
530                    let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
531                        info.values().flat_map(|jobs| jobs.iter()),
532                    );
533
534                    let cdc_table_ids = cdc_table_backfill_actors
535                        .keys()
536                        .cloned()
537                        .collect::<Vec<_>>();
538                    let cdc_table_snapshot_split_assignment =
539                        assign_cdc_table_snapshot_splits_pairs(
540                            cdc_table_backfill_actors,
541                            self.env.meta_store_ref(),
542                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
543                        )
544                        .await?;
545                    let cdc_table_snapshot_split_assignment =
546                        if cdc_table_snapshot_split_assignment.is_empty() {
547                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
548                        } else {
549                            let generation = self
550                                .env
551                                .cdc_table_backfill_tracker
552                                .next_generation(cdc_table_ids.into_iter());
553                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
554                                cdc_table_snapshot_split_assignment,
555                                generation,
556                            )
557                        };
558                    Ok(BarrierWorkerRuntimeInfoSnapshot {
559                        active_streaming_nodes,
560                        database_job_infos: info,
561                        state_table_committed_epochs,
562                        state_table_log_epochs,
563                        mv_depended_subscriptions,
564                        stream_actors,
565                        fragment_relations,
566                        source_splits,
567                        background_jobs,
568                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
569                        database_infos,
570                        cdc_table_snapshot_split_assignment,
571                    })
572                }
573            }
574        }
575    }
576
577    pub(super) async fn reload_database_runtime_info_impl(
578        &self,
579        database_id: DatabaseId,
580    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
581        self.clean_dirty_streaming_jobs(Some(database_id))
582            .await
583            .context("clean dirty streaming jobs")?;
584
585        // Background job progress needs to be recovered.
586        tracing::info!(
587            ?database_id,
588            "recovering background job progress of database"
589        );
590
591        let background_jobs = self
592            .list_background_job_progress(Some(database_id))
593            .await
594            .context("recover background job progress of database should not fail")?;
595        tracing::info!(?database_id, "recovered background job progress");
596
597        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
598        let dropped_table_ids = self
599            .scheduled_barriers
600            .pre_apply_drop_cancel(Some(database_id));
601        self.metadata_manager
602            .catalog_controller
603            .complete_dropped_tables(dropped_table_ids)
604            .await;
605
606        let active_streaming_nodes =
607            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
608
609        let mut all_info = self
610            .resolve_database_info(Some(database_id), &active_streaming_nodes)
611            .await
612            .inspect_err(|err| {
613                warn!(error = %err.as_report(), "resolve actor info failed");
614            })?;
615
616        let mut database_info = all_info
617            .remove(&database_id)
618            .map_or_else(HashMap::new, |table_map| {
619                HashMap::from([(database_id, table_map)])
620            });
621
622        self.recovery_table_with_upstream_sinks(&mut database_info)
623            .await?;
624
625        assert!(database_info.len() <= 1);
626
627        let stream_actors = self.load_stream_actors(&database_info).await?;
628
629        let Some(info) = database_info
630            .into_iter()
631            .next()
632            .map(|(loaded_database_id, info)| {
633                assert_eq!(loaded_database_id, database_id);
634                info
635            })
636        else {
637            return Ok(None);
638        };
639
640        let (state_table_committed_epochs, state_table_log_epochs) = self
641            .hummock_manager
642            .on_current_version(|version| {
643                Self::resolve_hummock_version_epochs(
644                    background_jobs
645                        .keys()
646                        .map(|job_id| (*job_id, &info[job_id])),
647                    version,
648                )
649            })
650            .await?;
651
652        let mv_depended_subscriptions = self
653            .metadata_manager
654            .get_mv_depended_subscriptions(Some(database_id))
655            .await?;
656
657        let fragment_relations = self
658            .metadata_manager
659            .catalog_controller
660            .get_fragment_downstream_relations(
661                info.values()
662                    .flatten()
663                    .map(|(fragment_id, _)| *fragment_id as _)
664                    .collect(),
665            )
666            .await?;
667
668        // get split assignments for all actors
669        let mut source_splits = HashMap::new();
670        for (_, fragment) in info.values().flatten() {
671            for (actor_id, info) in &fragment.actors {
672                source_splits.insert(*actor_id, info.splits.clone());
673            }
674        }
675
676        let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
677
678        let cdc_table_ids = cdc_table_backfill_actors
679            .keys()
680            .cloned()
681            .collect::<Vec<_>>();
682        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
683            cdc_table_backfill_actors,
684            self.env.meta_store_ref(),
685            self.env.cdc_table_backfill_tracker.completed_job_ids(),
686        )
687        .await?;
688        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
689        {
690            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
691        } else {
692            CdcTableSnapshotSplitAssignmentWithGeneration::new(
693                cdc_table_snapshot_split_assignment,
694                self.env
695                    .cdc_table_backfill_tracker
696                    .next_generation(cdc_table_ids.into_iter()),
697            )
698        };
699
700        self.refresh_manager
701            .remove_trackers_by_database(database_id);
702
703        Ok(Some(DatabaseRuntimeInfoSnapshot {
704            job_infos: info,
705            state_table_committed_epochs,
706            state_table_log_epochs,
707            mv_depended_subscriptions,
708            stream_actors,
709            fragment_relations,
710            source_splits,
711            background_jobs,
712            cdc_table_snapshot_split_assignment,
713        }))
714    }
715
716    async fn load_stream_actors(
717        &self,
718        all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
719    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
720        let job_ids = all_info
721            .values()
722            .flat_map(|jobs| jobs.keys().copied())
723            .collect_vec();
724
725        let job_extra_info = self
726            .metadata_manager
727            .catalog_controller
728            .get_streaming_job_extra_info(job_ids)
729            .await?;
730
731        let mut stream_actors = HashMap::new();
732
733        for (job_id, streaming_info) in all_info.values().flatten() {
734            let extra_info = job_extra_info
735                .get(job_id)
736                .cloned()
737                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
738            let expr_context = extra_info.stream_context().to_expr_context();
739            let job_definition = extra_info.job_definition;
740            let config_override = extra_info.config_override;
741
742            for (fragment_id, fragment_infos) in streaming_info {
743                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
744                    stream_actors.insert(
745                        *actor_id,
746                        StreamActor {
747                            actor_id: *actor_id as _,
748                            fragment_id: *fragment_id,
749                            vnode_bitmap: vnode_bitmap.clone(),
750                            mview_definition: job_definition.clone(),
751                            expr_context: Some(expr_context.clone()),
752                            config_override: config_override.clone(),
753                        },
754                    );
755                }
756            }
757        }
758        Ok(stream_actors)
759    }
760}