risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41    FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::manager::ActiveStreamingWorkerNodes;
44use crate::model::{ActorId, FragmentId, StreamActor};
45use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
46use crate::stream::cdc::reload_cdc_table_snapshot_splits;
47use crate::stream::{SourceChange, StreamFragmentGraph};
48
49impl GlobalBarrierWorkerContextImpl {
50    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
51    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
52        self.metadata_manager
53            .catalog_controller
54            .clean_dirty_subscription(database_id)
55            .await?;
56        let dirty_associated_source_ids = self
57            .metadata_manager
58            .catalog_controller
59            .clean_dirty_creating_jobs(database_id)
60            .await?;
61        self.metadata_manager
62            .reset_all_refresh_jobs_to_idle()
63            .await?;
64
65        // unregister cleaned sources.
66        self.source_manager
67            .apply_source_change(SourceChange::DropSource {
68                dropped_source_ids: dirty_associated_source_ids,
69            })
70            .await;
71
72        Ok(())
73    }
74
75    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
76        if let Some(database_id) = database_id {
77            let sink_ids = self
78                .metadata_manager
79                .catalog_controller
80                .list_sink_ids(Some(database_id))
81                .await?;
82            self.sink_manager.stop_sink_coordinator(sink_ids).await;
83        } else {
84            self.sink_manager.reset().await;
85        }
86        Ok(())
87    }
88
89    async fn abort_dirty_pending_sink_state(
90        &self,
91        database_id: Option<DatabaseId>,
92    ) -> MetaResult<()> {
93        let pending_sinks: HashSet<SinkId> = self
94            .metadata_manager
95            .catalog_controller
96            .list_all_pending_sinks(database_id)
97            .await?;
98
99        if pending_sinks.is_empty() {
100            return Ok(());
101        }
102
103        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
104            .metadata_manager
105            .catalog_controller
106            .fetch_sink_with_state_table_ids(pending_sinks)
107            .await?;
108
109        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
110
111        for (sink_id, table_ids) in sink_with_state_tables {
112            let Some(table_id) = table_ids.first() else {
113                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
114            };
115
116            self.hummock_manager
117                .on_current_version(|version| -> MetaResult<()> {
118                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
119                        assert!(
120                            sink_committed_epoch
121                                .insert(sink_id, committed_epoch)
122                                .is_none()
123                        );
124                        Ok(())
125                    } else {
126                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
127                    }
128                })
129                .await?;
130        }
131
132        self.metadata_manager
133            .catalog_controller
134            .abort_pending_sink_epochs(sink_committed_epoch)
135            .await?;
136
137        Ok(())
138    }
139
140    async fn purge_state_table_from_hummock(
141        &self,
142        all_state_table_ids: &HashSet<TableId>,
143    ) -> MetaResult<()> {
144        self.hummock_manager.purge(all_state_table_ids).await?;
145        Ok(())
146    }
147
148    async fn list_background_job_progress(
149        &self,
150        database_id: Option<DatabaseId>,
151    ) -> MetaResult<HashMap<JobId, String>> {
152        let mgr = &self.metadata_manager;
153        let job_info = mgr
154            .catalog_controller
155            .list_background_creating_jobs(false, database_id)
156            .await?;
157
158        Ok(job_info
159            .into_iter()
160            .map(|(job_id, definition, _init_at)| (job_id, definition))
161            .collect())
162    }
163
164    /// Async load stage: collect all metadata required for rendering actor assignments.
165    async fn load_fragment_context(
166        &self,
167        database_id: Option<DatabaseId>,
168    ) -> MetaResult<LoadedFragmentContext> {
169        self.metadata_manager
170            .catalog_controller
171            .load_fragment_context(database_id)
172            .await
173    }
174
175    /// Sync render stage: use loaded context and current workers to produce actor assignments.
176    fn render_actor_assignments(
177        &self,
178        database_id: Option<DatabaseId>,
179        loaded: &LoadedFragmentContext,
180        worker_nodes: &ActiveStreamingWorkerNodes,
181        adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
182    ) -> MetaResult<FragmentRenderMap> {
183        if loaded.is_empty() {
184            return Ok(HashMap::new());
185        }
186
187        let available_workers: BTreeMap<_, _> = worker_nodes
188            .current()
189            .values()
190            .filter(|worker| worker.is_streaming_schedulable())
191            .map(|worker| {
192                (
193                    worker.id,
194                    WorkerInfo {
195                        parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
196                        resource_group: worker.resource_group(),
197                    },
198                )
199            })
200            .collect();
201
202        let RenderedGraph { fragments, .. } = render_actor_assignments(
203            self.metadata_manager
204                .catalog_controller
205                .env
206                .actor_id_generator(),
207            &available_workers,
208            adaptive_parallelism_strategy,
209            loaded,
210        )?;
211
212        if let Some(database_id) = database_id {
213            for loaded_database_id in fragments.keys() {
214                assert_eq!(*loaded_database_id, database_id);
215            }
216        }
217
218        Ok(fragments)
219    }
220
221    #[expect(clippy::type_complexity)]
222    fn resolve_hummock_version_epochs(
223        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
224        version: &HummockVersion,
225    ) -> MetaResult<(
226        HashMap<TableId, u64>,
227        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
228    )> {
229        let table_committed_epoch: HashMap<_, _> = version
230            .state_table_info
231            .info()
232            .iter()
233            .map(|(table_id, info)| (*table_id, info.committed_epoch))
234            .collect();
235        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
236            Ok(*table_committed_epoch
237                .get(&table_id)
238                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
239        };
240        let mut min_downstream_committed_epochs = HashMap::new();
241        for (job_id, fragments) in background_jobs {
242            let job_committed_epoch = {
243                let mut table_id_iter =
244                    InflightFragmentInfo::existing_table_ids(fragments.values());
245                let Some(first_table_id) = table_id_iter.next() else {
246                    bail!("job {} has no state table", job_id);
247                };
248                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
249                for table_id in table_id_iter {
250                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
251                    if job_committed_epoch != table_committed_epoch {
252                        bail!(
253                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
254                            first_table_id,
255                            job_committed_epoch,
256                            table_id,
257                            table_committed_epoch,
258                            job_id
259                        );
260                    }
261                }
262
263                job_committed_epoch
264            };
265            if let (Some(snapshot_backfill_info), _) =
266                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
267                    fragments
268                        .values()
269                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
270                )?
271            {
272                for (upstream_table, snapshot_epoch) in
273                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
274                {
275                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
276                        anyhow!(
277                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
278                            job_id, upstream_table
279                        )
280                    })?;
281                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
282                    match min_downstream_committed_epochs.entry(upstream_table) {
283                        Entry::Occupied(entry) => {
284                            let prev_min_epoch = entry.into_mut();
285                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
286                        }
287                        Entry::Vacant(entry) => {
288                            entry.insert(pinned_epoch);
289                        }
290                    }
291                }
292            }
293        }
294        let mut log_epochs = HashMap::new();
295        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
296            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
297            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
298                Ordering::Less => {
299                    bail!(
300                        "downstream epoch {} later than upstream epoch {} of table {}",
301                        downstream_committed_epoch,
302                        upstream_committed_epoch,
303                        upstream_table_id
304                    );
305                }
306                Ordering::Equal => {
307                    continue;
308                }
309                Ordering::Greater => {
310                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
311                    {
312                        let epochs = table_change_log
313                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
314                            .map(|epoch_log| {
315                                (
316                                    epoch_log.non_checkpoint_epochs.clone(),
317                                    epoch_log.checkpoint_epoch,
318                                )
319                            })
320                            .collect_vec();
321                        let first_epochs = epochs.first();
322                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
323                            && *first_checkpoint_epoch == downstream_committed_epoch
324                        {
325                        } else {
326                            bail!(
327                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
328                                epochs,
329                                upstream_table_id,
330                                downstream_committed_epoch
331                            );
332                        }
333                        log_epochs
334                            .try_insert(upstream_table_id, epochs)
335                            .expect("non-duplicated");
336                    } else {
337                        bail!(
338                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
339                            upstream_table_id,
340                            upstream_committed_epoch,
341                            downstream_committed_epoch
342                        );
343                    }
344                }
345            }
346        }
347        Ok((table_committed_epoch, log_epochs))
348    }
349
350    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
351    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
352    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
353    /// the operator.
354    async fn recovery_table_with_upstream_sinks(
355        &self,
356        inflight_jobs: &mut FragmentRenderMap,
357    ) -> MetaResult<()> {
358        let mut jobs = inflight_jobs.values_mut().try_fold(
359            HashMap::new(),
360            |mut acc, table_map| -> MetaResult<_> {
361                for (job_id, job) in table_map {
362                    if acc.insert(*job_id, job).is_some() {
363                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
364                    }
365                }
366                Ok(acc)
367            },
368        )?;
369        // Only `Table` will be returned here, ignoring other catalog objects.
370        let tables = self
371            .metadata_manager
372            .catalog_controller
373            .get_user_created_table_by_ids(jobs.keys().copied())
374            .await?;
375        for table in tables {
376            assert_eq!(table.table_type(), PbTableType::Table);
377            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
378            let mut target_fragment_id = None;
379            for fragment in fragment_infos.values() {
380                let mut is_target_fragment = false;
381                visit_stream_node_cont(&fragment.nodes, |node| {
382                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
383                        is_target_fragment = true;
384                        false
385                    } else {
386                        true
387                    }
388                });
389                if is_target_fragment {
390                    target_fragment_id = Some(fragment.fragment_id);
391                    break;
392                }
393            }
394            let Some(target_fragment_id) = target_fragment_id else {
395                tracing::debug!(
396                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
397                    table.id
398                );
399                continue;
400            };
401            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
402            let upstream_infos = self
403                .metadata_manager
404                .catalog_controller
405                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
406                .await?;
407            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
408        }
409
410        Ok(())
411    }
412
413    pub(super) async fn reload_runtime_info_impl(
414        &self,
415    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
416        {
417            {
418                {
419                    self.clean_dirty_streaming_jobs(None)
420                        .await
421                        .context("clean dirty streaming jobs")?;
422
423                    self.reset_sink_coordinator(None)
424                        .await
425                        .context("reset sink coordinator")?;
426                    self.abort_dirty_pending_sink_state(None)
427                        .await
428                        .context("abort dirty pending sink state")?;
429
430                    // Background job progress needs to be recovered.
431                    tracing::info!("recovering background job progress");
432                    let background_jobs = self
433                        .list_background_job_progress(None)
434                        .await
435                        .context("recover background job progress should not fail")?;
436
437                    tracing::info!("recovered background job progress");
438
439                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
440                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
441                    self.metadata_manager
442                        .catalog_controller
443                        .cleanup_dropped_tables()
444                        .await;
445
446                    let adaptive_parallelism_strategy = {
447                        let system_params_reader = self
448                            .metadata_manager
449                            .catalog_controller
450                            .env
451                            .system_params_reader()
452                            .await;
453                        system_params_reader.adaptive_parallelism_strategy()
454                    };
455
456                    let active_streaming_nodes =
457                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
458                            .await?;
459
460                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
461
462                    tracing::info!(
463                        "background streaming jobs: {:?} total {}",
464                        background_streaming_jobs,
465                        background_streaming_jobs.len()
466                    );
467
468                    let unreschedulable_jobs = {
469                        let mut unreschedulable_jobs = HashSet::new();
470
471                        for job_id in background_streaming_jobs {
472                            let scan_types = self
473                                .metadata_manager
474                                .get_job_backfill_scan_types(job_id)
475                                .await?;
476
477                            if scan_types
478                                .values()
479                                .any(|scan_type| !scan_type.is_reschedulable())
480                            {
481                                unreschedulable_jobs.insert(job_id);
482                            }
483                        }
484
485                        unreschedulable_jobs
486                    };
487
488                    if !unreschedulable_jobs.is_empty() {
489                        tracing::info!(
490                            "unreschedulable background jobs: {:?}",
491                            unreschedulable_jobs
492                        );
493                    }
494
495                    // Resolve actor info for recovery. If there's no actor to recover, most of the
496                    // following steps will be no-op, while the compute nodes will still be reset.
497                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
498                    let mut info = if unreschedulable_jobs.is_empty() {
499                        info!("trigger offline scaling");
500                        let loaded = self.load_fragment_context(None).await.inspect_err(|err| {
501                            warn!(error = %err.as_report(), "load fragment context failed");
502                        })?;
503                        self.render_actor_assignments(
504                            None,
505                            &loaded,
506                            &active_streaming_nodes,
507                            adaptive_parallelism_strategy,
508                        )
509                        .inspect_err(|err| {
510                            warn!(error = %err.as_report(), "render actor assignments failed");
511                        })?
512                    } else {
513                        bail!(
514                            "Recovery for unreschedulable background jobs is not yet implemented. \
515                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
516                            unreschedulable_jobs
517                        );
518                    };
519
520                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
521                    if !dropped_table_ids.is_empty() {
522                        self.metadata_manager
523                            .catalog_controller
524                            .complete_dropped_tables(dropped_table_ids)
525                            .await;
526                        let loaded = self.load_fragment_context(None).await.inspect_err(|err| {
527                            warn!(error = %err.as_report(), "load fragment context failed");
528                        })?;
529                        info = self
530                            .render_actor_assignments(
531                                None,
532                                &loaded,
533                                &active_streaming_nodes,
534                                adaptive_parallelism_strategy,
535                            )
536                            .inspect_err(|err| {
537                                warn!(error = %err.as_report(), "render actor assignments failed");
538                            })?
539                    }
540
541                    self.recovery_table_with_upstream_sinks(&mut info).await?;
542
543                    let info = info;
544
545                    self.purge_state_table_from_hummock(
546                        &info
547                            .values()
548                            .flatten()
549                            .flat_map(|(_, fragments)| {
550                                InflightFragmentInfo::existing_table_ids(fragments.values())
551                            })
552                            .collect(),
553                    )
554                    .await
555                    .context("purge state table from hummock")?;
556
557                    let (state_table_committed_epochs, state_table_log_epochs) = self
558                        .hummock_manager
559                        .on_current_version(|version| {
560                            Self::resolve_hummock_version_epochs(
561                                info.values().flat_map(|jobs| {
562                                    jobs.iter().filter_map(|(job_id, job)| {
563                                        background_jobs
564                                            .contains_key(job_id)
565                                            .then_some((*job_id, job))
566                                    })
567                                }),
568                                version,
569                            )
570                        })
571                        .await?;
572
573                    let mv_depended_subscriptions = self
574                        .metadata_manager
575                        .get_mv_depended_subscriptions(None)
576                        .await?;
577
578                    let stream_actors = self.load_stream_actors(&info).await?;
579
580                    let fragment_relations = self
581                        .metadata_manager
582                        .catalog_controller
583                        .get_fragment_downstream_relations(
584                            info.values()
585                                .flatten()
586                                .flat_map(|(_, job)| job.keys())
587                                .map(|fragment_id| *fragment_id as _)
588                                .collect(),
589                        )
590                        .await?;
591
592                    let background_jobs = {
593                        let mut background_jobs = self
594                            .list_background_job_progress(None)
595                            .await
596                            .context("recover background job progress should not fail")?;
597                        info.values()
598                            .flatten()
599                            .filter_map(|(job_id, _)| {
600                                background_jobs
601                                    .remove(job_id)
602                                    .map(|definition| (*job_id, definition))
603                            })
604                            .collect()
605                    };
606
607                    let database_infos = self
608                        .metadata_manager
609                        .catalog_controller
610                        .list_databases()
611                        .await?;
612
613                    // get split assignments for all actors
614                    let mut source_splits = HashMap::new();
615                    for (_, fragment_infos) in info.values().flatten() {
616                        for fragment in fragment_infos.values() {
617                            for (actor_id, info) in &fragment.actors {
618                                source_splits.insert(*actor_id, info.splits.clone());
619                            }
620                        }
621                    }
622
623                    let cdc_table_snapshot_splits =
624                        reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
625                            .await?;
626
627                    Ok(BarrierWorkerRuntimeInfoSnapshot {
628                        active_streaming_nodes,
629                        database_job_infos: info,
630                        state_table_committed_epochs,
631                        state_table_log_epochs,
632                        mv_depended_subscriptions,
633                        stream_actors,
634                        fragment_relations,
635                        source_splits,
636                        background_jobs,
637                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
638                        database_infos,
639                        cdc_table_snapshot_splits,
640                    })
641                }
642            }
643        }
644    }
645
646    pub(super) async fn reload_database_runtime_info_impl(
647        &self,
648        database_id: DatabaseId,
649    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
650        self.clean_dirty_streaming_jobs(Some(database_id))
651            .await
652            .context("clean dirty streaming jobs")?;
653
654        self.reset_sink_coordinator(Some(database_id))
655            .await
656            .context("reset sink coordinator")?;
657        self.abort_dirty_pending_sink_state(Some(database_id))
658            .await
659            .context("abort dirty pending sink state")?;
660
661        // Background job progress needs to be recovered.
662        tracing::info!(
663            ?database_id,
664            "recovering background job progress of database"
665        );
666
667        let background_jobs = self
668            .list_background_job_progress(Some(database_id))
669            .await
670            .context("recover background job progress of database should not fail")?;
671        tracing::info!(?database_id, "recovered background job progress");
672
673        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
674        let dropped_table_ids = self
675            .scheduled_barriers
676            .pre_apply_drop_cancel(Some(database_id));
677        self.metadata_manager
678            .catalog_controller
679            .complete_dropped_tables(dropped_table_ids)
680            .await;
681
682        let adaptive_parallelism_strategy = {
683            let system_params_reader = self
684                .metadata_manager
685                .catalog_controller
686                .env
687                .system_params_reader()
688                .await;
689            system_params_reader.adaptive_parallelism_strategy()
690        };
691
692        let active_streaming_nodes =
693            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
694
695        let loaded = self
696            .load_fragment_context(Some(database_id))
697            .await
698            .inspect_err(|err| {
699                warn!(error = %err.as_report(), "load fragment context failed");
700            })?;
701
702        let mut all_info = self
703            .render_actor_assignments(
704                Some(database_id),
705                &loaded,
706                &active_streaming_nodes,
707                adaptive_parallelism_strategy,
708            )
709            .inspect_err(|err| {
710                warn!(error = %err.as_report(), "render actor assignments failed");
711            })?;
712
713        let mut database_info = all_info
714            .remove(&database_id)
715            .map_or_else(HashMap::new, |table_map| {
716                HashMap::from([(database_id, table_map)])
717            });
718
719        self.recovery_table_with_upstream_sinks(&mut database_info)
720            .await?;
721
722        assert!(database_info.len() <= 1);
723
724        let stream_actors = self.load_stream_actors(&database_info).await?;
725
726        let Some(info) = database_info
727            .into_iter()
728            .next()
729            .map(|(loaded_database_id, info)| {
730                assert_eq!(loaded_database_id, database_id);
731                info
732            })
733        else {
734            return Ok(None);
735        };
736
737        let (state_table_committed_epochs, state_table_log_epochs) = self
738            .hummock_manager
739            .on_current_version(|version| {
740                Self::resolve_hummock_version_epochs(
741                    background_jobs
742                        .keys()
743                        .map(|job_id| (*job_id, &info[job_id])),
744                    version,
745                )
746            })
747            .await?;
748
749        let mv_depended_subscriptions = self
750            .metadata_manager
751            .get_mv_depended_subscriptions(Some(database_id))
752            .await?;
753
754        let fragment_relations = self
755            .metadata_manager
756            .catalog_controller
757            .get_fragment_downstream_relations(
758                info.values()
759                    .flatten()
760                    .map(|(fragment_id, _)| *fragment_id as _)
761                    .collect(),
762            )
763            .await?;
764
765        // get split assignments for all actors
766        let mut source_splits = HashMap::new();
767        for (_, fragment) in info.values().flatten() {
768            for (actor_id, info) in &fragment.actors {
769                source_splits.insert(*actor_id, info.splits.clone());
770            }
771        }
772
773        let cdc_table_snapshot_splits =
774            reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
775                .await?;
776
777        self.refresh_manager
778            .remove_trackers_by_database(database_id);
779
780        Ok(Some(DatabaseRuntimeInfoSnapshot {
781            job_infos: info,
782            state_table_committed_epochs,
783            state_table_log_epochs,
784            mv_depended_subscriptions,
785            stream_actors,
786            fragment_relations,
787            source_splits,
788            background_jobs,
789            cdc_table_snapshot_splits,
790        }))
791    }
792
793    async fn load_stream_actors(
794        &self,
795        all_info: &FragmentRenderMap,
796    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
797        let job_ids = all_info
798            .values()
799            .flat_map(|jobs| jobs.keys().copied())
800            .collect_vec();
801
802        let job_extra_info = self
803            .metadata_manager
804            .catalog_controller
805            .get_streaming_job_extra_info(job_ids)
806            .await?;
807
808        let mut stream_actors = HashMap::new();
809
810        for (job_id, streaming_info) in all_info.values().flatten() {
811            let extra_info = job_extra_info
812                .get(job_id)
813                .cloned()
814                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
815            let expr_context = extra_info.stream_context().to_expr_context();
816            let job_definition = extra_info.job_definition;
817            let config_override = extra_info.config_override;
818
819            for (fragment_id, fragment_infos) in streaming_info {
820                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
821                    stream_actors.insert(
822                        *actor_id,
823                        StreamActor {
824                            actor_id: *actor_id as _,
825                            fragment_id: *fragment_id,
826                            vnode_bitmap: vnode_bitmap.clone(),
827                            mview_definition: job_definition.clone(),
828                            expr_context: Some(expr_context.clone()),
829                            config_override: config_override.clone(),
830                        },
831                    );
832                }
833            }
834        }
835        Ok(stream_actors)
836    }
837}