risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::manager::ActiveStreamingWorkerNodes;
38use crate::model::{ActorId, FragmentId, StreamActor};
39use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
40use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
41use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, StreamFragmentGraph};
42
43impl GlobalBarrierWorkerContextImpl {
44    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
45    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
46        self.metadata_manager
47            .catalog_controller
48            .clean_dirty_subscription(database_id)
49            .await?;
50        let dirty_associated_source_ids = self
51            .metadata_manager
52            .catalog_controller
53            .clean_dirty_creating_jobs(database_id)
54            .await?;
55        self.metadata_manager
56            .catalog_controller
57            .reset_refreshing_tables(database_id)
58            .await?;
59
60        // unregister cleaned sources.
61        self.source_manager
62            .apply_source_change(SourceChange::DropSource {
63                dropped_source_ids: dirty_associated_source_ids,
64            })
65            .await;
66
67        Ok(())
68    }
69
70    async fn purge_state_table_from_hummock(
71        &self,
72        all_state_table_ids: &HashSet<TableId>,
73    ) -> MetaResult<()> {
74        self.hummock_manager.purge(all_state_table_ids).await?;
75        Ok(())
76    }
77
78    async fn list_background_job_progress(
79        &self,
80        database_id: Option<DatabaseId>,
81    ) -> MetaResult<HashMap<JobId, String>> {
82        let mgr = &self.metadata_manager;
83        let job_info = mgr
84            .catalog_controller
85            .list_background_creating_jobs(false, database_id)
86            .await?;
87
88        Ok(job_info
89            .into_iter()
90            .map(|(job_id, definition, _init_at)| (job_id, definition))
91            .collect())
92    }
93
94    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
95    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
96    /// will create or drop before this barrier flow through them.
97    async fn resolve_graph_info(
98        &self,
99        database_id: Option<DatabaseId>,
100        worker_nodes: &ActiveStreamingWorkerNodes,
101    ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
102    {
103        let all_actor_infos = self
104            .metadata_manager
105            .catalog_controller
106            .load_all_actors_dynamic(database_id, worker_nodes)
107            .await?;
108
109        Ok(all_actor_infos
110            .into_iter()
111            .map(|(loaded_database_id, job_fragment_infos)| {
112                if let Some(database_id) = database_id {
113                    assert_eq!(database_id, loaded_database_id);
114                }
115                (
116                    loaded_database_id,
117                    job_fragment_infos
118                        .into_iter()
119                        .map(|(job_id, fragment_infos)| {
120                            (
121                                job_id,
122                                fragment_infos
123                                    .into_iter()
124                                    .map(|(fragment_id, info)| (fragment_id as _, info))
125                                    .collect(),
126                            )
127                        })
128                        .collect(),
129                )
130            })
131            .collect())
132    }
133
134    #[expect(clippy::type_complexity)]
135    fn resolve_hummock_version_epochs(
136        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
137        version: &HummockVersion,
138    ) -> MetaResult<(
139        HashMap<TableId, u64>,
140        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
141    )> {
142        let table_committed_epoch: HashMap<_, _> = version
143            .state_table_info
144            .info()
145            .iter()
146            .map(|(table_id, info)| (*table_id, info.committed_epoch))
147            .collect();
148        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
149            Ok(*table_committed_epoch
150                .get(&table_id)
151                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
152        };
153        let mut min_downstream_committed_epochs = HashMap::new();
154        for (job_id, fragments) in background_jobs {
155            let job_committed_epoch = {
156                let mut table_id_iter =
157                    InflightFragmentInfo::existing_table_ids(fragments.values());
158                let Some(first_table_id) = table_id_iter.next() else {
159                    bail!("job {} has no state table", job_id);
160                };
161                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
162                for table_id in table_id_iter {
163                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
164                    if job_committed_epoch != table_committed_epoch {
165                        bail!(
166                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
167                            first_table_id,
168                            job_committed_epoch,
169                            table_id,
170                            table_committed_epoch,
171                            job_id
172                        );
173                    }
174                }
175
176                job_committed_epoch
177            };
178            if let (Some(snapshot_backfill_info), _) =
179                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
180                    fragments
181                        .values()
182                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
183                )?
184            {
185                for (upstream_table, snapshot_epoch) in
186                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
187                {
188                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
189                        anyhow!(
190                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
191                            job_id, upstream_table
192                        )
193                    })?;
194                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
195                    match min_downstream_committed_epochs.entry(upstream_table) {
196                        Entry::Occupied(entry) => {
197                            let prev_min_epoch = entry.into_mut();
198                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
199                        }
200                        Entry::Vacant(entry) => {
201                            entry.insert(pinned_epoch);
202                        }
203                    }
204                }
205            }
206        }
207        let mut log_epochs = HashMap::new();
208        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
209            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
210            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
211                Ordering::Less => {
212                    bail!(
213                        "downstream epoch {} later than upstream epoch {} of table {}",
214                        downstream_committed_epoch,
215                        upstream_committed_epoch,
216                        upstream_table_id
217                    );
218                }
219                Ordering::Equal => {
220                    continue;
221                }
222                Ordering::Greater => {
223                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
224                    {
225                        let epochs = table_change_log
226                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
227                            .map(|epoch_log| {
228                                (
229                                    epoch_log.non_checkpoint_epochs.clone(),
230                                    epoch_log.checkpoint_epoch,
231                                )
232                            })
233                            .collect_vec();
234                        let first_epochs = epochs.first();
235                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
236                            && *first_checkpoint_epoch == downstream_committed_epoch
237                        {
238                        } else {
239                            bail!(
240                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
241                                epochs,
242                                upstream_table_id,
243                                downstream_committed_epoch
244                            );
245                        }
246                        log_epochs
247                            .try_insert(upstream_table_id, epochs)
248                            .expect("non-duplicated");
249                    } else {
250                        bail!(
251                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
252                            upstream_table_id,
253                            upstream_committed_epoch,
254                            downstream_committed_epoch
255                        );
256                    }
257                }
258            }
259        }
260        Ok((table_committed_epoch, log_epochs))
261    }
262
263    fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
264    where
265        I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
266    {
267        let mut cdc_table_backfill_actors = HashMap::new();
268
269        for (job_id, fragments) in jobs {
270            for fragment_info in fragments.values() {
271                if fragment_info
272                    .fragment_type_mask
273                    .contains(FragmentTypeFlag::StreamCdcScan)
274                {
275                    cdc_table_backfill_actors
276                        .entry(*job_id)
277                        .or_insert_with(HashSet::new)
278                        .extend(fragment_info.actors.keys().cloned());
279                }
280            }
281        }
282
283        cdc_table_backfill_actors
284    }
285
286    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
287    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
288    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
289    /// the operator.
290    async fn recovery_table_with_upstream_sinks(
291        &self,
292        inflight_jobs: &mut HashMap<
293            DatabaseId,
294            HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
295        >,
296    ) -> MetaResult<()> {
297        let mut jobs = inflight_jobs.values_mut().try_fold(
298            HashMap::new(),
299            |mut acc, table_map| -> MetaResult<_> {
300                for (job_id, job) in table_map {
301                    if acc.insert(*job_id, job).is_some() {
302                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
303                    }
304                }
305                Ok(acc)
306            },
307        )?;
308        // Only `Table` will be returned here, ignoring other catalog objects.
309        let tables = self
310            .metadata_manager
311            .catalog_controller
312            .get_user_created_table_by_ids(jobs.keys().copied())
313            .await?;
314        for table in tables {
315            assert_eq!(table.table_type(), PbTableType::Table);
316            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
317            let mut target_fragment_id = None;
318            for fragment in fragment_infos.values() {
319                let mut is_target_fragment = false;
320                visit_stream_node_cont(&fragment.nodes, |node| {
321                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
322                        is_target_fragment = true;
323                        false
324                    } else {
325                        true
326                    }
327                });
328                if is_target_fragment {
329                    target_fragment_id = Some(fragment.fragment_id);
330                    break;
331                }
332            }
333            let Some(target_fragment_id) = target_fragment_id else {
334                tracing::debug!(
335                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
336                    table.id
337                );
338                continue;
339            };
340            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
341            let upstream_infos = self
342                .metadata_manager
343                .catalog_controller
344                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
345                .await?;
346            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
347        }
348
349        Ok(())
350    }
351
352    pub(super) async fn reload_runtime_info_impl(
353        &self,
354    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
355        {
356            {
357                {
358                    self.clean_dirty_streaming_jobs(None)
359                        .await
360                        .context("clean dirty streaming jobs")?;
361
362                    // Background job progress needs to be recovered.
363                    tracing::info!("recovering background job progress");
364                    let background_jobs = self
365                        .list_background_job_progress(None)
366                        .await
367                        .context("recover background job progress should not fail")?;
368
369                    tracing::info!("recovered background job progress");
370
371                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
372                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
373                    self.metadata_manager
374                        .catalog_controller
375                        .cleanup_dropped_tables()
376                        .await;
377
378                    let active_streaming_nodes =
379                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
380                            .await?;
381
382                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
383
384                    tracing::info!(
385                        "background streaming jobs: {:?} total {}",
386                        background_streaming_jobs,
387                        background_streaming_jobs.len()
388                    );
389
390                    let unreschedulable_jobs = {
391                        let mut unreschedulable_jobs = HashSet::new();
392
393                        for job_id in background_streaming_jobs {
394                            let scan_types = self
395                                .metadata_manager
396                                .get_job_backfill_scan_types(job_id)
397                                .await?;
398
399                            if scan_types
400                                .values()
401                                .any(|scan_type| !scan_type.is_reschedulable())
402                            {
403                                unreschedulable_jobs.insert(job_id);
404                            }
405                        }
406
407                        unreschedulable_jobs
408                    };
409
410                    if !unreschedulable_jobs.is_empty() {
411                        tracing::info!(
412                            "unreschedulable background jobs: {:?}",
413                            unreschedulable_jobs
414                        );
415                    }
416
417                    // Resolve actor info for recovery. If there's no actor to recover, most of the
418                    // following steps will be no-op, while the compute nodes will still be reset.
419                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
420                    let mut info = if unreschedulable_jobs.is_empty() {
421                        info!("trigger offline scaling");
422                        self.resolve_graph_info(None, &active_streaming_nodes)
423                            .await
424                            .inspect_err(|err| {
425                                warn!(error = %err.as_report(), "resolve actor info failed");
426                            })?
427                    } else {
428                        bail!(
429                            "Recovery for unreschedulable background jobs is not yet implemented. \
430                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
431                            unreschedulable_jobs
432                        );
433                    };
434
435                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
436                    if !dropped_table_ids.is_empty() {
437                        self.metadata_manager
438                            .catalog_controller
439                            .complete_dropped_tables(dropped_table_ids)
440                            .await;
441                        info = self
442                            .resolve_graph_info(None, &active_streaming_nodes)
443                            .await
444                            .inspect_err(|err| {
445                                warn!(error = %err.as_report(), "resolve actor info failed");
446                            })?
447                    }
448
449                    self.recovery_table_with_upstream_sinks(&mut info).await?;
450
451                    let info = info;
452
453                    self.purge_state_table_from_hummock(
454                        &info
455                            .values()
456                            .flatten()
457                            .flat_map(|(_, fragments)| {
458                                InflightFragmentInfo::existing_table_ids(fragments.values())
459                            })
460                            .collect(),
461                    )
462                    .await
463                    .context("purge state table from hummock")?;
464
465                    let (state_table_committed_epochs, state_table_log_epochs) = self
466                        .hummock_manager
467                        .on_current_version(|version| {
468                            Self::resolve_hummock_version_epochs(
469                                info.values().flat_map(|jobs| {
470                                    jobs.iter().filter_map(|(job_id, job)| {
471                                        background_jobs
472                                            .contains_key(job_id)
473                                            .then_some((*job_id, job))
474                                    })
475                                }),
476                                version,
477                            )
478                        })
479                        .await?;
480
481                    let mv_depended_subscriptions = self
482                        .metadata_manager
483                        .get_mv_depended_subscriptions(None)
484                        .await?;
485
486                    let stream_actors = self.load_stream_actors(&info).await?;
487
488                    let fragment_relations = self
489                        .metadata_manager
490                        .catalog_controller
491                        .get_fragment_downstream_relations(
492                            info.values()
493                                .flatten()
494                                .flat_map(|(_, job)| job.keys())
495                                .map(|fragment_id| *fragment_id as _)
496                                .collect(),
497                        )
498                        .await?;
499
500                    let background_jobs = {
501                        let mut background_jobs = self
502                            .list_background_job_progress(None)
503                            .await
504                            .context("recover background job progress should not fail")?;
505                        info.values()
506                            .flatten()
507                            .filter_map(|(job_id, _)| {
508                                background_jobs
509                                    .remove(job_id)
510                                    .map(|definition| (*job_id, definition))
511                            })
512                            .collect()
513                    };
514
515                    let database_infos = self
516                        .metadata_manager
517                        .catalog_controller
518                        .list_databases()
519                        .await?;
520
521                    // get split assignments for all actors
522                    let mut source_splits = HashMap::new();
523                    for (_, fragment_infos) in info.values().flatten() {
524                        for fragment in fragment_infos.values() {
525                            for (actor_id, info) in &fragment.actors {
526                                source_splits.insert(*actor_id, info.splits.clone());
527                            }
528                        }
529                    }
530
531                    let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
532                        info.values().flat_map(|jobs| jobs.iter()),
533                    );
534
535                    let cdc_table_ids = cdc_table_backfill_actors
536                        .keys()
537                        .cloned()
538                        .collect::<Vec<_>>();
539                    let cdc_table_snapshot_split_assignment =
540                        assign_cdc_table_snapshot_splits_pairs(
541                            cdc_table_backfill_actors,
542                            self.env.meta_store_ref(),
543                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
544                        )
545                        .await?;
546                    let cdc_table_snapshot_split_assignment =
547                        if cdc_table_snapshot_split_assignment.is_empty() {
548                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
549                        } else {
550                            let generation = self
551                                .env
552                                .cdc_table_backfill_tracker
553                                .next_generation(cdc_table_ids.into_iter());
554                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
555                                cdc_table_snapshot_split_assignment,
556                                generation,
557                            )
558                        };
559                    Ok(BarrierWorkerRuntimeInfoSnapshot {
560                        active_streaming_nodes,
561                        database_job_infos: info,
562                        state_table_committed_epochs,
563                        state_table_log_epochs,
564                        mv_depended_subscriptions,
565                        stream_actors,
566                        fragment_relations,
567                        source_splits,
568                        background_jobs,
569                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
570                        database_infos,
571                        cdc_table_snapshot_split_assignment,
572                    })
573                }
574            }
575        }
576    }
577
578    pub(super) async fn reload_database_runtime_info_impl(
579        &self,
580        database_id: DatabaseId,
581    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
582        self.clean_dirty_streaming_jobs(Some(database_id))
583            .await
584            .context("clean dirty streaming jobs")?;
585
586        // Background job progress needs to be recovered.
587        tracing::info!(
588            ?database_id,
589            "recovering background job progress of database"
590        );
591
592        let background_jobs = self
593            .list_background_job_progress(Some(database_id))
594            .await
595            .context("recover background job progress of database should not fail")?;
596        tracing::info!(?database_id, "recovered background job progress");
597
598        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
599        let dropped_table_ids = self
600            .scheduled_barriers
601            .pre_apply_drop_cancel(Some(database_id));
602        self.metadata_manager
603            .catalog_controller
604            .complete_dropped_tables(dropped_table_ids)
605            .await;
606
607        let active_streaming_nodes =
608            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
609
610        let all_info = self
611            .resolve_graph_info(Some(database_id), &active_streaming_nodes)
612            .await
613            .inspect_err(|err| {
614                warn!(error = %err.as_report(), "resolve actor info failed");
615            })?;
616
617        let mut database_info = all_info
618            .get(&database_id)
619            .cloned()
620            .map_or_else(HashMap::new, |table_map| {
621                HashMap::from([(database_id, table_map)])
622            });
623
624        self.recovery_table_with_upstream_sinks(&mut database_info)
625            .await?;
626
627        assert!(database_info.len() <= 1);
628
629        let stream_actors = self.load_stream_actors(&database_info).await?;
630
631        let Some(info) = database_info
632            .into_iter()
633            .next()
634            .map(|(loaded_database_id, info)| {
635                assert_eq!(loaded_database_id, database_id);
636                info
637            })
638        else {
639            return Ok(None);
640        };
641
642        let (state_table_committed_epochs, state_table_log_epochs) = self
643            .hummock_manager
644            .on_current_version(|version| {
645                Self::resolve_hummock_version_epochs(
646                    background_jobs
647                        .keys()
648                        .map(|job_id| (*job_id, &info[job_id])),
649                    version,
650                )
651            })
652            .await?;
653
654        let mv_depended_subscriptions = self
655            .metadata_manager
656            .get_mv_depended_subscriptions(Some(database_id))
657            .await?;
658
659        let fragment_relations = self
660            .metadata_manager
661            .catalog_controller
662            .get_fragment_downstream_relations(
663                info.values()
664                    .flatten()
665                    .map(|(fragment_id, _)| *fragment_id as _)
666                    .collect(),
667            )
668            .await?;
669
670        // get split assignments for all actors
671        let mut source_splits = HashMap::new();
672        for (_, fragment) in info.values().flatten() {
673            for (actor_id, info) in &fragment.actors {
674                source_splits.insert(*actor_id, info.splits.clone());
675            }
676        }
677
678        let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
679
680        let cdc_table_ids = cdc_table_backfill_actors
681            .keys()
682            .cloned()
683            .collect::<Vec<_>>();
684        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
685            cdc_table_backfill_actors,
686            self.env.meta_store_ref(),
687            self.env.cdc_table_backfill_tracker.completed_job_ids(),
688        )
689        .await?;
690        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
691        {
692            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
693        } else {
694            CdcTableSnapshotSplitAssignmentWithGeneration::new(
695                cdc_table_snapshot_split_assignment,
696                self.env
697                    .cdc_table_backfill_tracker
698                    .next_generation(cdc_table_ids.into_iter()),
699            )
700        };
701
702        REFRESH_TABLE_PROGRESS_TRACKER
703            .lock()
704            .remove_tracker_by_database_id(database_id);
705
706        Ok(Some(DatabaseRuntimeInfoSnapshot {
707            job_infos: info,
708            state_table_committed_epochs,
709            state_table_log_epochs,
710            mv_depended_subscriptions,
711            stream_actors,
712            fragment_relations,
713            source_splits,
714            background_jobs,
715            cdc_table_snapshot_split_assignment,
716        }))
717    }
718
719    async fn load_stream_actors(
720        &self,
721        all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
722    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
723        let job_ids = all_info
724            .values()
725            .flat_map(|jobs| jobs.keys().copied())
726            .collect_vec();
727
728        let job_extra_info = self
729            .metadata_manager
730            .catalog_controller
731            .get_streaming_job_extra_info(job_ids)
732            .await?;
733
734        let mut stream_actors = HashMap::new();
735
736        for (job_id, streaming_info) in all_info.values().flatten() {
737            let extra_info = job_extra_info
738                .get(job_id)
739                .cloned()
740                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
741            let expr_context = extra_info.stream_context().to_expr_context();
742            let job_definition = extra_info.job_definition;
743            let config_override = extra_info.config_override;
744
745            for (fragment_id, fragment_info) in streaming_info {
746                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
747                    stream_actors.insert(
748                        *actor_id,
749                        StreamActor {
750                            actor_id: *actor_id as _,
751                            fragment_id: *fragment_id,
752                            vnode_bitmap: vnode_bitmap.clone(),
753                            mview_definition: job_definition.clone(),
754                            expr_context: Some(expr_context.clone()),
755                            config_override: config_override.clone(),
756                        },
757                    );
758                }
759            }
760        }
761        Ok(stream_actors)
762    }
763}