risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_meta_model::SinkId;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::reload_cdc_table_snapshot_splits;
41use crate::stream::{SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
45    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46        self.metadata_manager
47            .catalog_controller
48            .clean_dirty_subscription(database_id)
49            .await?;
50        let dirty_associated_source_ids = self
51            .metadata_manager
52            .catalog_controller
53            .clean_dirty_creating_jobs(database_id)
54            .await?;
55        self.metadata_manager
56            .reset_all_refresh_jobs_to_idle()
57            .await?;
58
59        // unregister cleaned sources.
60        self.source_manager
61            .apply_source_change(SourceChange::DropSource {
62                dropped_source_ids: dirty_associated_source_ids,
63            })
64            .await;
65
66        Ok(())
67    }
68
69    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
70        if let Some(database_id) = database_id {
71            let sink_ids = self
72                .metadata_manager
73                .catalog_controller
74                .list_sink_ids(Some(database_id))
75                .await?;
76            self.sink_manager.stop_sink_coordinator(sink_ids).await;
77        } else {
78            self.sink_manager.reset().await;
79        }
80        Ok(())
81    }
82
83    async fn abort_dirty_pending_sink_state(
84        &self,
85        database_id: Option<DatabaseId>,
86    ) -> MetaResult<()> {
87        let pending_sinks: HashSet<SinkId> = self
88            .metadata_manager
89            .catalog_controller
90            .list_all_pending_sinks(database_id)
91            .await?;
92
93        if pending_sinks.is_empty() {
94            return Ok(());
95        }
96
97        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
98            .metadata_manager
99            .catalog_controller
100            .fetch_sink_with_state_table_ids(pending_sinks)
101            .await?;
102
103        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
104
105        for (sink_id, table_ids) in sink_with_state_tables {
106            let Some(table_id) = table_ids.first() else {
107                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
108            };
109
110            self.hummock_manager
111                .on_current_version(|version| -> MetaResult<()> {
112                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
113                        assert!(
114                            sink_committed_epoch
115                                .insert(sink_id, committed_epoch)
116                                .is_none()
117                        );
118                        Ok(())
119                    } else {
120                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
121                    }
122                })
123                .await?;
124        }
125
126        self.metadata_manager
127            .catalog_controller
128            .abort_pending_sink_epochs(sink_committed_epoch)
129            .await?;
130
131        Ok(())
132    }
133
134    async fn purge_state_table_from_hummock(
135        &self,
136        all_state_table_ids: &HashSet<TableId>,
137    ) -> MetaResult<()> {
138        self.hummock_manager.purge(all_state_table_ids).await?;
139        Ok(())
140    }
141
142    async fn list_background_job_progress(
143        &self,
144        database_id: Option<DatabaseId>,
145    ) -> MetaResult<HashMap<JobId, String>> {
146        let mgr = &self.metadata_manager;
147        let job_info = mgr
148            .catalog_controller
149            .list_background_creating_jobs(false, database_id)
150            .await?;
151
152        Ok(job_info
153            .into_iter()
154            .map(|(job_id, definition, _init_at)| (job_id, definition))
155            .collect())
156    }
157
158    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
159    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
160    /// will create or drop before this barrier flow through them.
161    async fn resolve_database_info(
162        &self,
163        database_id: Option<DatabaseId>,
164        worker_nodes: &ActiveStreamingWorkerNodes,
165    ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
166    {
167        let all_actor_infos = self
168            .metadata_manager
169            .catalog_controller
170            .load_all_actors_dynamic(database_id, worker_nodes)
171            .await?;
172
173        Ok(all_actor_infos
174            .into_iter()
175            .map(|(loaded_database_id, job_fragment_infos)| {
176                if let Some(database_id) = database_id {
177                    assert_eq!(database_id, loaded_database_id);
178                }
179                (
180                    loaded_database_id,
181                    job_fragment_infos
182                        .into_iter()
183                        .map(|(job_id, fragment_infos)| {
184                            (
185                                job_id,
186                                fragment_infos
187                                    .into_iter()
188                                    .map(|(fragment_id, info)| (fragment_id as _, info))
189                                    .collect(),
190                            )
191                        })
192                        .collect(),
193                )
194            })
195            .collect())
196    }
197
198    #[expect(clippy::type_complexity)]
199    fn resolve_hummock_version_epochs(
200        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
201        version: &HummockVersion,
202    ) -> MetaResult<(
203        HashMap<TableId, u64>,
204        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
205    )> {
206        let table_committed_epoch: HashMap<_, _> = version
207            .state_table_info
208            .info()
209            .iter()
210            .map(|(table_id, info)| (*table_id, info.committed_epoch))
211            .collect();
212        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
213            Ok(*table_committed_epoch
214                .get(&table_id)
215                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
216        };
217        let mut min_downstream_committed_epochs = HashMap::new();
218        for (job_id, fragments) in background_jobs {
219            let job_committed_epoch = {
220                let mut table_id_iter =
221                    InflightFragmentInfo::existing_table_ids(fragments.values());
222                let Some(first_table_id) = table_id_iter.next() else {
223                    bail!("job {} has no state table", job_id);
224                };
225                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
226                for table_id in table_id_iter {
227                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
228                    if job_committed_epoch != table_committed_epoch {
229                        bail!(
230                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
231                            first_table_id,
232                            job_committed_epoch,
233                            table_id,
234                            table_committed_epoch,
235                            job_id
236                        );
237                    }
238                }
239
240                job_committed_epoch
241            };
242            if let (Some(snapshot_backfill_info), _) =
243                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
244                    fragments
245                        .values()
246                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
247                )?
248            {
249                for (upstream_table, snapshot_epoch) in
250                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
251                {
252                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
253                        anyhow!(
254                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
255                            job_id, upstream_table
256                        )
257                    })?;
258                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
259                    match min_downstream_committed_epochs.entry(upstream_table) {
260                        Entry::Occupied(entry) => {
261                            let prev_min_epoch = entry.into_mut();
262                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
263                        }
264                        Entry::Vacant(entry) => {
265                            entry.insert(pinned_epoch);
266                        }
267                    }
268                }
269            }
270        }
271        let mut log_epochs = HashMap::new();
272        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
273            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
274            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
275                Ordering::Less => {
276                    bail!(
277                        "downstream epoch {} later than upstream epoch {} of table {}",
278                        downstream_committed_epoch,
279                        upstream_committed_epoch,
280                        upstream_table_id
281                    );
282                }
283                Ordering::Equal => {
284                    continue;
285                }
286                Ordering::Greater => {
287                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
288                    {
289                        let epochs = table_change_log
290                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
291                            .map(|epoch_log| {
292                                (
293                                    epoch_log.non_checkpoint_epochs.clone(),
294                                    epoch_log.checkpoint_epoch,
295                                )
296                            })
297                            .collect_vec();
298                        let first_epochs = epochs.first();
299                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
300                            && *first_checkpoint_epoch == downstream_committed_epoch
301                        {
302                        } else {
303                            bail!(
304                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
305                                epochs,
306                                upstream_table_id,
307                                downstream_committed_epoch
308                            );
309                        }
310                        log_epochs
311                            .try_insert(upstream_table_id, epochs)
312                            .expect("non-duplicated");
313                    } else {
314                        bail!(
315                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
316                            upstream_table_id,
317                            upstream_committed_epoch,
318                            downstream_committed_epoch
319                        );
320                    }
321                }
322            }
323        }
324        Ok((table_committed_epoch, log_epochs))
325    }
326
327    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
328    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
329    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
330    /// the operator.
331    async fn recovery_table_with_upstream_sinks(
332        &self,
333        inflight_jobs: &mut HashMap<
334            DatabaseId,
335            HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
336        >,
337    ) -> MetaResult<()> {
338        let mut jobs = inflight_jobs.values_mut().try_fold(
339            HashMap::new(),
340            |mut acc, table_map| -> MetaResult<_> {
341                for (job_id, job) in table_map {
342                    if acc.insert(*job_id, job).is_some() {
343                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
344                    }
345                }
346                Ok(acc)
347            },
348        )?;
349        // Only `Table` will be returned here, ignoring other catalog objects.
350        let tables = self
351            .metadata_manager
352            .catalog_controller
353            .get_user_created_table_by_ids(jobs.keys().copied())
354            .await?;
355        for table in tables {
356            assert_eq!(table.table_type(), PbTableType::Table);
357            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
358            let mut target_fragment_id = None;
359            for fragment in fragment_infos.values() {
360                let mut is_target_fragment = false;
361                visit_stream_node_cont(&fragment.nodes, |node| {
362                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
363                        is_target_fragment = true;
364                        false
365                    } else {
366                        true
367                    }
368                });
369                if is_target_fragment {
370                    target_fragment_id = Some(fragment.fragment_id);
371                    break;
372                }
373            }
374            let Some(target_fragment_id) = target_fragment_id else {
375                tracing::debug!(
376                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
377                    table.id
378                );
379                continue;
380            };
381            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
382            let upstream_infos = self
383                .metadata_manager
384                .catalog_controller
385                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
386                .await?;
387            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
388        }
389
390        Ok(())
391    }
392
393    pub(super) async fn reload_runtime_info_impl(
394        &self,
395    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
396        {
397            {
398                {
399                    self.clean_dirty_streaming_jobs(None)
400                        .await
401                        .context("clean dirty streaming jobs")?;
402
403                    self.reset_sink_coordinator(None)
404                        .await
405                        .context("reset sink coordinator")?;
406                    self.abort_dirty_pending_sink_state(None)
407                        .await
408                        .context("abort dirty pending sink state")?;
409
410                    // Background job progress needs to be recovered.
411                    tracing::info!("recovering background job progress");
412                    let background_jobs = self
413                        .list_background_job_progress(None)
414                        .await
415                        .context("recover background job progress should not fail")?;
416
417                    tracing::info!("recovered background job progress");
418
419                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
420                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
421                    self.metadata_manager
422                        .catalog_controller
423                        .cleanup_dropped_tables()
424                        .await;
425
426                    let active_streaming_nodes =
427                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
428                            .await?;
429
430                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
431
432                    tracing::info!(
433                        "background streaming jobs: {:?} total {}",
434                        background_streaming_jobs,
435                        background_streaming_jobs.len()
436                    );
437
438                    let unreschedulable_jobs = {
439                        let mut unreschedulable_jobs = HashSet::new();
440
441                        for job_id in background_streaming_jobs {
442                            let scan_types = self
443                                .metadata_manager
444                                .get_job_backfill_scan_types(job_id)
445                                .await?;
446
447                            if scan_types
448                                .values()
449                                .any(|scan_type| !scan_type.is_reschedulable())
450                            {
451                                unreschedulable_jobs.insert(job_id);
452                            }
453                        }
454
455                        unreschedulable_jobs
456                    };
457
458                    if !unreschedulable_jobs.is_empty() {
459                        tracing::info!(
460                            "unreschedulable background jobs: {:?}",
461                            unreschedulable_jobs
462                        );
463                    }
464
465                    // Resolve actor info for recovery. If there's no actor to recover, most of the
466                    // following steps will be no-op, while the compute nodes will still be reset.
467                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
468                    let mut info = if unreschedulable_jobs.is_empty() {
469                        info!("trigger offline scaling");
470                        self.resolve_database_info(None, &active_streaming_nodes)
471                            .await
472                            .inspect_err(|err| {
473                                warn!(error = %err.as_report(), "resolve actor info failed");
474                            })?
475                    } else {
476                        bail!(
477                            "Recovery for unreschedulable background jobs is not yet implemented. \
478                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
479                            unreschedulable_jobs
480                        );
481                    };
482
483                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
484                    if !dropped_table_ids.is_empty() {
485                        self.metadata_manager
486                            .catalog_controller
487                            .complete_dropped_tables(dropped_table_ids)
488                            .await;
489                        info = self
490                            .resolve_database_info(None, &active_streaming_nodes)
491                            .await
492                            .inspect_err(|err| {
493                                warn!(error = %err.as_report(), "resolve actor info failed");
494                            })?
495                    }
496
497                    self.recovery_table_with_upstream_sinks(&mut info).await?;
498
499                    let info = info;
500
501                    self.purge_state_table_from_hummock(
502                        &info
503                            .values()
504                            .flatten()
505                            .flat_map(|(_, fragments)| {
506                                InflightFragmentInfo::existing_table_ids(fragments.values())
507                            })
508                            .collect(),
509                    )
510                    .await
511                    .context("purge state table from hummock")?;
512
513                    let (state_table_committed_epochs, state_table_log_epochs) = self
514                        .hummock_manager
515                        .on_current_version(|version| {
516                            Self::resolve_hummock_version_epochs(
517                                info.values().flat_map(|jobs| {
518                                    jobs.iter().filter_map(|(job_id, job)| {
519                                        background_jobs
520                                            .contains_key(job_id)
521                                            .then_some((*job_id, job))
522                                    })
523                                }),
524                                version,
525                            )
526                        })
527                        .await?;
528
529                    let mv_depended_subscriptions = self
530                        .metadata_manager
531                        .get_mv_depended_subscriptions(None)
532                        .await?;
533
534                    let stream_actors = self.load_stream_actors(&info).await?;
535
536                    let fragment_relations = self
537                        .metadata_manager
538                        .catalog_controller
539                        .get_fragment_downstream_relations(
540                            info.values()
541                                .flatten()
542                                .flat_map(|(_, job)| job.keys())
543                                .map(|fragment_id| *fragment_id as _)
544                                .collect(),
545                        )
546                        .await?;
547
548                    let background_jobs = {
549                        let mut background_jobs = self
550                            .list_background_job_progress(None)
551                            .await
552                            .context("recover background job progress should not fail")?;
553                        info.values()
554                            .flatten()
555                            .filter_map(|(job_id, _)| {
556                                background_jobs
557                                    .remove(job_id)
558                                    .map(|definition| (*job_id, definition))
559                            })
560                            .collect()
561                    };
562
563                    let database_infos = self
564                        .metadata_manager
565                        .catalog_controller
566                        .list_databases()
567                        .await?;
568
569                    // get split assignments for all actors
570                    let mut source_splits = HashMap::new();
571                    for (_, fragment_infos) in info.values().flatten() {
572                        for fragment in fragment_infos.values() {
573                            for (actor_id, info) in &fragment.actors {
574                                source_splits.insert(*actor_id, info.splits.clone());
575                            }
576                        }
577                    }
578
579                    let cdc_table_snapshot_splits =
580                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
581                            .await?;
582
583                    Ok(BarrierWorkerRuntimeInfoSnapshot {
584                        active_streaming_nodes,
585                        database_job_infos: info,
586                        state_table_committed_epochs,
587                        state_table_log_epochs,
588                        mv_depended_subscriptions,
589                        stream_actors,
590                        fragment_relations,
591                        source_splits,
592                        background_jobs,
593                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
594                        database_infos,
595                        cdc_table_snapshot_splits,
596                    })
597                }
598            }
599        }
600    }
601
602    pub(super) async fn reload_database_runtime_info_impl(
603        &self,
604        database_id: DatabaseId,
605    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
606        self.clean_dirty_streaming_jobs(Some(database_id))
607            .await
608            .context("clean dirty streaming jobs")?;
609
610        self.reset_sink_coordinator(Some(database_id))
611            .await
612            .context("reset sink coordinator")?;
613        self.abort_dirty_pending_sink_state(Some(database_id))
614            .await
615            .context("abort dirty pending sink state")?;
616
617        // Background job progress needs to be recovered.
618        tracing::info!(
619            ?database_id,
620            "recovering background job progress of database"
621        );
622
623        let background_jobs = self
624            .list_background_job_progress(Some(database_id))
625            .await
626            .context("recover background job progress of database should not fail")?;
627        tracing::info!(?database_id, "recovered background job progress");
628
629        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
630        let dropped_table_ids = self
631            .scheduled_barriers
632            .pre_apply_drop_cancel(Some(database_id));
633        self.metadata_manager
634            .catalog_controller
635            .complete_dropped_tables(dropped_table_ids)
636            .await;
637
638        let active_streaming_nodes =
639            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
640
641        let mut all_info = self
642            .resolve_database_info(Some(database_id), &active_streaming_nodes)
643            .await
644            .inspect_err(|err| {
645                warn!(error = %err.as_report(), "resolve actor info failed");
646            })?;
647
648        let mut database_info = all_info
649            .remove(&database_id)
650            .map_or_else(HashMap::new, |table_map| {
651                HashMap::from([(database_id, table_map)])
652            });
653
654        self.recovery_table_with_upstream_sinks(&mut database_info)
655            .await?;
656
657        assert!(database_info.len() <= 1);
658
659        let stream_actors = self.load_stream_actors(&database_info).await?;
660
661        let Some(info) = database_info
662            .into_iter()
663            .next()
664            .map(|(loaded_database_id, info)| {
665                assert_eq!(loaded_database_id, database_id);
666                info
667            })
668        else {
669            return Ok(None);
670        };
671
672        let (state_table_committed_epochs, state_table_log_epochs) = self
673            .hummock_manager
674            .on_current_version(|version| {
675                Self::resolve_hummock_version_epochs(
676                    background_jobs
677                        .keys()
678                        .map(|job_id| (*job_id, &info[job_id])),
679                    version,
680                )
681            })
682            .await?;
683
684        let mv_depended_subscriptions = self
685            .metadata_manager
686            .get_mv_depended_subscriptions(Some(database_id))
687            .await?;
688
689        let fragment_relations = self
690            .metadata_manager
691            .catalog_controller
692            .get_fragment_downstream_relations(
693                info.values()
694                    .flatten()
695                    .map(|(fragment_id, _)| *fragment_id as _)
696                    .collect(),
697            )
698            .await?;
699
700        // get split assignments for all actors
701        let mut source_splits = HashMap::new();
702        for (_, fragment) in info.values().flatten() {
703            for (actor_id, info) in &fragment.actors {
704                source_splits.insert(*actor_id, info.splits.clone());
705            }
706        }
707
708        let cdc_table_snapshot_splits =
709            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
710                .await?;
711
712        self.refresh_manager
713            .remove_trackers_by_database(database_id);
714
715        Ok(Some(DatabaseRuntimeInfoSnapshot {
716            job_infos: info,
717            state_table_committed_epochs,
718            state_table_log_epochs,
719            mv_depended_subscriptions,
720            stream_actors,
721            fragment_relations,
722            source_splits,
723            background_jobs,
724            cdc_table_snapshot_splits,
725        }))
726    }
727
728    async fn load_stream_actors(
729        &self,
730        all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
731    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
732        let job_ids = all_info
733            .values()
734            .flat_map(|jobs| jobs.keys().copied())
735            .collect_vec();
736
737        let job_extra_info = self
738            .metadata_manager
739            .catalog_controller
740            .get_streaming_job_extra_info(job_ids)
741            .await?;
742
743        let mut stream_actors = HashMap::new();
744
745        for (job_id, streaming_info) in all_info.values().flatten() {
746            let extra_info = job_extra_info
747                .get(job_id)
748                .cloned()
749                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
750            let expr_context = extra_info.stream_context().to_expr_context();
751            let job_definition = extra_info.job_definition;
752            let config_override = extra_info.config_override;
753
754            for (fragment_id, fragment_infos) in streaming_info {
755                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
756                    stream_actors.insert(
757                        *actor_id,
758                        StreamActor {
759                            actor_id: *actor_id as _,
760                            fragment_id: *fragment_id,
761                            vnode_bitmap: vnode_bitmap.clone(),
762                            mview_definition: job_definition.clone(),
763                            expr_context: Some(expr_context.clone()),
764                            config_override: config_override.clone(),
765                        },
766                    );
767                }
768            }
769        }
770        Ok(stream_actors)
771    }
772}