risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::controller::utils::StreamingJobExtraInfo;
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor, StreamContext};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{REFRESH_TABLE_PROGRESS_TRACKER, SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        self.metadata_manager
48            .catalog_controller
49            .clean_dirty_subscription(database_id)
50            .await?;
51        let dirty_associated_source_ids = self
52            .metadata_manager
53            .catalog_controller
54            .clean_dirty_creating_jobs(database_id)
55            .await?;
56        self.metadata_manager
57            .catalog_controller
58            .reset_refreshing_tables(database_id)
59            .await?;
60
61        // unregister cleaned sources.
62        self.source_manager
63            .apply_source_change(SourceChange::DropSource {
64                dropped_source_ids: dirty_associated_source_ids,
65            })
66            .await;
67
68        Ok(())
69    }
70
71    async fn purge_state_table_from_hummock(
72        &self,
73        all_state_table_ids: &HashSet<TableId>,
74    ) -> MetaResult<()> {
75        self.hummock_manager.purge(all_state_table_ids).await?;
76        Ok(())
77    }
78
79    async fn list_background_job_progress(
80        &self,
81        database_id: Option<DatabaseId>,
82    ) -> MetaResult<HashMap<JobId, String>> {
83        let mgr = &self.metadata_manager;
84        let job_info = mgr
85            .catalog_controller
86            .list_background_creating_jobs(false, database_id)
87            .await?;
88
89        Ok(job_info
90            .into_iter()
91            .map(|(job_id, definition, _init_at)| (job_id, definition))
92            .collect())
93    }
94
95    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
96    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
97    /// will create or drop before this barrier flow through them.
98    async fn resolve_graph_info(
99        &self,
100        database_id: Option<DatabaseId>,
101        worker_nodes: &ActiveStreamingWorkerNodes,
102    ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
103    {
104        let all_actor_infos = self
105            .metadata_manager
106            .catalog_controller
107            .load_all_actors_dynamic(database_id, worker_nodes)
108            .await?;
109
110        Ok(all_actor_infos
111            .into_iter()
112            .map(|(loaded_database_id, job_fragment_infos)| {
113                if let Some(database_id) = database_id {
114                    assert_eq!(database_id, loaded_database_id);
115                }
116                (
117                    loaded_database_id,
118                    job_fragment_infos
119                        .into_iter()
120                        .map(|(job_id, fragment_infos)| {
121                            (
122                                job_id,
123                                fragment_infos
124                                    .into_iter()
125                                    .map(|(fragment_id, info)| (fragment_id as _, info))
126                                    .collect(),
127                            )
128                        })
129                        .collect(),
130                )
131            })
132            .collect())
133    }
134
135    #[expect(clippy::type_complexity)]
136    fn resolve_hummock_version_epochs(
137        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
138        version: &HummockVersion,
139    ) -> MetaResult<(
140        HashMap<TableId, u64>,
141        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
142    )> {
143        let table_committed_epoch: HashMap<_, _> = version
144            .state_table_info
145            .info()
146            .iter()
147            .map(|(table_id, info)| (*table_id, info.committed_epoch))
148            .collect();
149        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
150            Ok(*table_committed_epoch
151                .get(&table_id)
152                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
153        };
154        let mut min_downstream_committed_epochs = HashMap::new();
155        for (job_id, fragments) in background_jobs {
156            let job_committed_epoch = {
157                let mut table_id_iter =
158                    InflightFragmentInfo::existing_table_ids(fragments.values());
159                let Some(first_table_id) = table_id_iter.next() else {
160                    bail!("job {} has no state table", job_id);
161                };
162                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
163                for table_id in table_id_iter {
164                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
165                    if job_committed_epoch != table_committed_epoch {
166                        bail!(
167                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
168                            first_table_id,
169                            job_committed_epoch,
170                            table_id,
171                            table_committed_epoch,
172                            job_id
173                        );
174                    }
175                }
176
177                job_committed_epoch
178            };
179            if let (Some(snapshot_backfill_info), _) =
180                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
181                    fragments
182                        .values()
183                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
184                )?
185            {
186                for (upstream_table, snapshot_epoch) in
187                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
188                {
189                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
190                        anyhow!(
191                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
192                            job_id, upstream_table
193                        )
194                    })?;
195                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
196                    match min_downstream_committed_epochs.entry(upstream_table) {
197                        Entry::Occupied(entry) => {
198                            let prev_min_epoch = entry.into_mut();
199                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
200                        }
201                        Entry::Vacant(entry) => {
202                            entry.insert(pinned_epoch);
203                        }
204                    }
205                }
206            }
207        }
208        let mut log_epochs = HashMap::new();
209        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
210            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
211            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
212                Ordering::Less => {
213                    bail!(
214                        "downstream epoch {} later than upstream epoch {} of table {}",
215                        downstream_committed_epoch,
216                        upstream_committed_epoch,
217                        upstream_table_id
218                    );
219                }
220                Ordering::Equal => {
221                    continue;
222                }
223                Ordering::Greater => {
224                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
225                    {
226                        let epochs = table_change_log
227                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
228                            .map(|epoch_log| {
229                                (
230                                    epoch_log.non_checkpoint_epochs.clone(),
231                                    epoch_log.checkpoint_epoch,
232                                )
233                            })
234                            .collect_vec();
235                        let first_epochs = epochs.first();
236                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
237                            && *first_checkpoint_epoch == downstream_committed_epoch
238                        {
239                        } else {
240                            bail!(
241                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
242                                epochs,
243                                upstream_table_id,
244                                downstream_committed_epoch
245                            );
246                        }
247                        log_epochs
248                            .try_insert(upstream_table_id, epochs)
249                            .expect("non-duplicated");
250                    } else {
251                        bail!(
252                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
253                            upstream_table_id,
254                            upstream_committed_epoch,
255                            downstream_committed_epoch
256                        );
257                    }
258                }
259            }
260        }
261        Ok((table_committed_epoch, log_epochs))
262    }
263
264    fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
265    where
266        I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
267    {
268        let mut cdc_table_backfill_actors = HashMap::new();
269
270        for (job_id, fragments) in jobs {
271            for fragment_info in fragments.values() {
272                if fragment_info
273                    .fragment_type_mask
274                    .contains(FragmentTypeFlag::StreamCdcScan)
275                {
276                    cdc_table_backfill_actors
277                        .entry(*job_id)
278                        .or_insert_with(HashSet::new)
279                        .extend(fragment_info.actors.keys().cloned());
280                }
281            }
282        }
283
284        cdc_table_backfill_actors
285    }
286
287    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
288    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
289    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
290    /// the operator.
291    async fn recovery_table_with_upstream_sinks(
292        &self,
293        inflight_jobs: &mut HashMap<
294            DatabaseId,
295            HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
296        >,
297    ) -> MetaResult<()> {
298        let mut jobs = inflight_jobs.values_mut().try_fold(
299            HashMap::new(),
300            |mut acc, table_map| -> MetaResult<_> {
301                for (job_id, job) in table_map {
302                    if acc.insert(*job_id, job).is_some() {
303                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
304                    }
305                }
306                Ok(acc)
307            },
308        )?;
309        // Only `Table` will be returned here, ignoring other catalog objects.
310        let tables = self
311            .metadata_manager
312            .catalog_controller
313            .get_user_created_table_by_ids(jobs.keys().copied())
314            .await?;
315        for table in tables {
316            assert_eq!(table.table_type(), PbTableType::Table);
317            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
318            let mut target_fragment_id = None;
319            for fragment in fragment_infos.values() {
320                let mut is_target_fragment = false;
321                visit_stream_node_cont(&fragment.nodes, |node| {
322                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
323                        is_target_fragment = true;
324                        false
325                    } else {
326                        true
327                    }
328                });
329                if is_target_fragment {
330                    target_fragment_id = Some(fragment.fragment_id);
331                    break;
332                }
333            }
334            let Some(target_fragment_id) = target_fragment_id else {
335                tracing::debug!(
336                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
337                    table.id
338                );
339                continue;
340            };
341            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
342            let upstream_infos = self
343                .metadata_manager
344                .catalog_controller
345                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
346                .await?;
347            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
348        }
349
350        Ok(())
351    }
352
353    pub(super) async fn reload_runtime_info_impl(
354        &self,
355    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
356        {
357            {
358                {
359                    self.clean_dirty_streaming_jobs(None)
360                        .await
361                        .context("clean dirty streaming jobs")?;
362
363                    // Background job progress needs to be recovered.
364                    tracing::info!("recovering background job progress");
365                    let background_jobs = self
366                        .list_background_job_progress(None)
367                        .await
368                        .context("recover background job progress should not fail")?;
369
370                    tracing::info!("recovered background job progress");
371
372                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
373                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
374                    self.metadata_manager
375                        .catalog_controller
376                        .cleanup_dropped_tables()
377                        .await;
378
379                    let active_streaming_nodes =
380                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
381                            .await?;
382
383                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
384
385                    tracing::info!(
386                        "background streaming jobs: {:?} total {}",
387                        background_streaming_jobs,
388                        background_streaming_jobs.len()
389                    );
390
391                    let unreschedulable_jobs = {
392                        let mut unreschedulable_jobs = HashSet::new();
393
394                        for job_id in background_streaming_jobs {
395                            let scan_types = self
396                                .metadata_manager
397                                .get_job_backfill_scan_types(job_id)
398                                .await?;
399
400                            if scan_types
401                                .values()
402                                .any(|scan_type| !scan_type.is_reschedulable())
403                            {
404                                unreschedulable_jobs.insert(job_id);
405                            }
406                        }
407
408                        unreschedulable_jobs
409                    };
410
411                    if !unreschedulable_jobs.is_empty() {
412                        tracing::info!(
413                            "unreschedulable background jobs: {:?}",
414                            unreschedulable_jobs
415                        );
416                    }
417
418                    // Resolve actor info for recovery. If there's no actor to recover, most of the
419                    // following steps will be no-op, while the compute nodes will still be reset.
420                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
421                    let mut info = if unreschedulable_jobs.is_empty() {
422                        info!("trigger offline scaling");
423                        self.resolve_graph_info(None, &active_streaming_nodes)
424                            .await
425                            .inspect_err(|err| {
426                                warn!(error = %err.as_report(), "resolve actor info failed");
427                            })?
428                    } else {
429                        bail!(
430                            "Recovery for unreschedulable background jobs is not yet implemented. \
431                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
432                            unreschedulable_jobs
433                        );
434                    };
435
436                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
437                    if !dropped_table_ids.is_empty() {
438                        self.metadata_manager
439                            .catalog_controller
440                            .complete_dropped_tables(dropped_table_ids)
441                            .await;
442                        info = self
443                            .resolve_graph_info(None, &active_streaming_nodes)
444                            .await
445                            .inspect_err(|err| {
446                                warn!(error = %err.as_report(), "resolve actor info failed");
447                            })?
448                    }
449
450                    self.recovery_table_with_upstream_sinks(&mut info).await?;
451
452                    let info = info;
453
454                    self.purge_state_table_from_hummock(
455                        &info
456                            .values()
457                            .flatten()
458                            .flat_map(|(_, fragments)| {
459                                InflightFragmentInfo::existing_table_ids(fragments.values())
460                            })
461                            .collect(),
462                    )
463                    .await
464                    .context("purge state table from hummock")?;
465
466                    let (state_table_committed_epochs, state_table_log_epochs) = self
467                        .hummock_manager
468                        .on_current_version(|version| {
469                            Self::resolve_hummock_version_epochs(
470                                info.values().flat_map(|jobs| {
471                                    jobs.iter().filter_map(|(job_id, job)| {
472                                        background_jobs
473                                            .contains_key(job_id)
474                                            .then_some((*job_id, job))
475                                    })
476                                }),
477                                version,
478                            )
479                        })
480                        .await?;
481
482                    let mv_depended_subscriptions = self
483                        .metadata_manager
484                        .get_mv_depended_subscriptions(None)
485                        .await?;
486
487                    let stream_actors = self.load_stream_actors(&info).await?;
488
489                    let fragment_relations = self
490                        .metadata_manager
491                        .catalog_controller
492                        .get_fragment_downstream_relations(
493                            info.values()
494                                .flatten()
495                                .flat_map(|(_, job)| job.keys())
496                                .map(|fragment_id| *fragment_id as _)
497                                .collect(),
498                        )
499                        .await?;
500
501                    let background_jobs = {
502                        let mut background_jobs = self
503                            .list_background_job_progress(None)
504                            .await
505                            .context("recover background job progress should not fail")?;
506                        info.values()
507                            .flatten()
508                            .filter_map(|(job_id, _)| {
509                                background_jobs
510                                    .remove(job_id)
511                                    .map(|definition| (*job_id, definition))
512                            })
513                            .collect()
514                    };
515
516                    let database_infos = self
517                        .metadata_manager
518                        .catalog_controller
519                        .list_databases()
520                        .await?;
521
522                    // get split assignments for all actors
523                    let mut source_splits = HashMap::new();
524                    for (_, fragment_infos) in info.values().flatten() {
525                        for fragment in fragment_infos.values() {
526                            for (actor_id, info) in &fragment.actors {
527                                source_splits.insert(*actor_id, info.splits.clone());
528                            }
529                        }
530                    }
531
532                    let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
533                        info.values().flat_map(|jobs| jobs.iter()),
534                    );
535
536                    let cdc_table_ids = cdc_table_backfill_actors
537                        .keys()
538                        .cloned()
539                        .collect::<Vec<_>>();
540                    let cdc_table_snapshot_split_assignment =
541                        assign_cdc_table_snapshot_splits_pairs(
542                            cdc_table_backfill_actors,
543                            self.env.meta_store_ref(),
544                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
545                        )
546                        .await?;
547                    let cdc_table_snapshot_split_assignment =
548                        if cdc_table_snapshot_split_assignment.is_empty() {
549                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
550                        } else {
551                            let generation = self
552                                .env
553                                .cdc_table_backfill_tracker
554                                .next_generation(cdc_table_ids.into_iter());
555                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
556                                cdc_table_snapshot_split_assignment,
557                                generation,
558                            )
559                        };
560                    Ok(BarrierWorkerRuntimeInfoSnapshot {
561                        active_streaming_nodes,
562                        database_job_infos: info,
563                        state_table_committed_epochs,
564                        state_table_log_epochs,
565                        mv_depended_subscriptions,
566                        stream_actors,
567                        fragment_relations,
568                        source_splits,
569                        background_jobs,
570                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
571                        database_infos,
572                        cdc_table_snapshot_split_assignment,
573                    })
574                }
575            }
576        }
577    }
578
579    pub(super) async fn reload_database_runtime_info_impl(
580        &self,
581        database_id: DatabaseId,
582    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
583        self.clean_dirty_streaming_jobs(Some(database_id))
584            .await
585            .context("clean dirty streaming jobs")?;
586
587        // Background job progress needs to be recovered.
588        tracing::info!(
589            ?database_id,
590            "recovering background job progress of database"
591        );
592
593        let background_jobs = self
594            .list_background_job_progress(Some(database_id))
595            .await
596            .context("recover background job progress of database should not fail")?;
597        tracing::info!(?database_id, "recovered background job progress");
598
599        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
600        let dropped_table_ids = self
601            .scheduled_barriers
602            .pre_apply_drop_cancel(Some(database_id));
603        self.metadata_manager
604            .catalog_controller
605            .complete_dropped_tables(dropped_table_ids)
606            .await;
607
608        let active_streaming_nodes =
609            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
610
611        let all_info = self
612            .resolve_graph_info(Some(database_id), &active_streaming_nodes)
613            .await
614            .inspect_err(|err| {
615                warn!(error = %err.as_report(), "resolve actor info failed");
616            })?;
617
618        let mut database_info = all_info
619            .get(&database_id)
620            .cloned()
621            .map_or_else(HashMap::new, |table_map| {
622                HashMap::from([(database_id, table_map)])
623            });
624
625        self.recovery_table_with_upstream_sinks(&mut database_info)
626            .await?;
627
628        assert!(database_info.len() <= 1);
629
630        let stream_actors = self.load_stream_actors(&database_info).await?;
631
632        let Some(info) = database_info
633            .into_iter()
634            .next()
635            .map(|(loaded_database_id, info)| {
636                assert_eq!(loaded_database_id, database_id);
637                info
638            })
639        else {
640            return Ok(None);
641        };
642
643        let (state_table_committed_epochs, state_table_log_epochs) = self
644            .hummock_manager
645            .on_current_version(|version| {
646                Self::resolve_hummock_version_epochs(
647                    background_jobs
648                        .keys()
649                        .map(|job_id| (*job_id, &info[job_id])),
650                    version,
651                )
652            })
653            .await?;
654
655        let mv_depended_subscriptions = self
656            .metadata_manager
657            .get_mv_depended_subscriptions(Some(database_id))
658            .await?;
659
660        let fragment_relations = self
661            .metadata_manager
662            .catalog_controller
663            .get_fragment_downstream_relations(
664                info.values()
665                    .flatten()
666                    .map(|(fragment_id, _)| *fragment_id as _)
667                    .collect(),
668            )
669            .await?;
670
671        // get split assignments for all actors
672        let mut source_splits = HashMap::new();
673        for (_, fragment) in info.values().flatten() {
674            for (actor_id, info) in &fragment.actors {
675                source_splits.insert(*actor_id, info.splits.clone());
676            }
677        }
678
679        let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
680
681        let cdc_table_ids = cdc_table_backfill_actors
682            .keys()
683            .cloned()
684            .collect::<Vec<_>>();
685        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
686            cdc_table_backfill_actors,
687            self.env.meta_store_ref(),
688            self.env.cdc_table_backfill_tracker.completed_job_ids(),
689        )
690        .await?;
691        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
692        {
693            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
694        } else {
695            CdcTableSnapshotSplitAssignmentWithGeneration::new(
696                cdc_table_snapshot_split_assignment,
697                self.env
698                    .cdc_table_backfill_tracker
699                    .next_generation(cdc_table_ids.into_iter()),
700            )
701        };
702
703        REFRESH_TABLE_PROGRESS_TRACKER
704            .lock()
705            .remove_tracker_by_database_id(database_id);
706
707        Ok(Some(DatabaseRuntimeInfoSnapshot {
708            job_infos: info,
709            state_table_committed_epochs,
710            state_table_log_epochs,
711            mv_depended_subscriptions,
712            stream_actors,
713            fragment_relations,
714            source_splits,
715            background_jobs,
716            cdc_table_snapshot_split_assignment,
717        }))
718    }
719
720    async fn load_stream_actors(
721        &self,
722        all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
723    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
724        let job_ids = all_info
725            .values()
726            .flat_map(|jobs| jobs.keys().copied())
727            .collect_vec();
728
729        let job_extra_info = self
730            .metadata_manager
731            .catalog_controller
732            .get_streaming_job_extra_info(job_ids)
733            .await?;
734
735        let mut stream_actors = HashMap::new();
736
737        for (job_id, streaming_info) in all_info.values().flatten() {
738            let StreamingJobExtraInfo {
739                timezone,
740                job_definition,
741            } = job_extra_info
742                .get(job_id)
743                .cloned()
744                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
745
746            let expr_context = Some(StreamContext { timezone }.to_expr_context());
747
748            for (fragment_id, fragment_info) in streaming_info {
749                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
750                    stream_actors.insert(
751                        *actor_id,
752                        StreamActor {
753                            actor_id: *actor_id as _,
754                            fragment_id: *fragment_id,
755                            vnode_bitmap: vnode_bitmap.clone(),
756                            mview_definition: job_definition.clone(),
757                            expr_context: expr_context.clone(),
758                        },
759                    );
760                }
761            }
762        }
763        Ok(stream_actors)
764    }
765}