risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
24use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
25use risingwave_hummock_sdk::version::HummockVersion;
26use risingwave_meta_model::ObjectId;
27use risingwave_pb::catalog::table::PbTableType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use thiserror_ext::AsReport;
30use tracing::{info, warn};
31
32use super::BarrierWorkerRuntimeInfoSnapshot;
33use crate::MetaResult;
34use crate::barrier::DatabaseRuntimeInfoSnapshot;
35use crate::barrier::context::GlobalBarrierWorkerContextImpl;
36use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
37use crate::controller::utils::StreamingJobExtraInfo;
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor, StreamContext};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        let database_id = database_id.map(|database_id| database_id.database_id as _);
48        self.metadata_manager
49            .catalog_controller
50            .clean_dirty_subscription(database_id)
51            .await?;
52        let dirty_associated_source_ids = self
53            .metadata_manager
54            .catalog_controller
55            .clean_dirty_creating_jobs(database_id)
56            .await?;
57        self.metadata_manager
58            .catalog_controller
59            .reset_refreshing_tables(database_id)
60            .await?;
61
62        // unregister cleaned sources.
63        self.source_manager
64            .apply_source_change(SourceChange::DropSource {
65                dropped_source_ids: dirty_associated_source_ids,
66            })
67            .await;
68
69        Ok(())
70    }
71
72    async fn purge_state_table_from_hummock(
73        &self,
74        all_state_table_ids: &HashSet<TableId>,
75    ) -> MetaResult<()> {
76        self.hummock_manager.purge(all_state_table_ids).await?;
77        Ok(())
78    }
79
80    async fn list_background_job_progress(
81        &self,
82        database_id: Option<DatabaseId>,
83    ) -> MetaResult<HashMap<TableId, String>> {
84        let mgr = &self.metadata_manager;
85        let job_info = mgr
86            .catalog_controller
87            .list_background_creating_jobs(
88                false,
89                database_id.map(|database_id| database_id.database_id as _),
90            )
91            .await?;
92
93        Ok(job_info
94            .into_iter()
95            .map(|(id, definition, _init_at)| {
96                let table_id = TableId::new(id as _);
97                (table_id, definition)
98            })
99            .collect())
100    }
101
102    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
103    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
104    /// will create or drop before this barrier flow through them.
105    async fn resolve_graph_info(
106        &self,
107        database_id: Option<DatabaseId>,
108        worker_nodes: &ActiveStreamingWorkerNodes,
109    ) -> MetaResult<HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>>
110    {
111        let database_id = database_id.map(|database_id| database_id.database_id as _);
112
113        let all_actor_infos = self
114            .metadata_manager
115            .catalog_controller
116            .load_all_actors_dynamic(database_id, worker_nodes)
117            .await?;
118
119        Ok(all_actor_infos
120            .into_iter()
121            .map(|(loaded_database_id, job_fragment_infos)| {
122                if let Some(database_id) = database_id {
123                    assert_eq!(database_id, loaded_database_id);
124                }
125                (
126                    DatabaseId::new(loaded_database_id as _),
127                    job_fragment_infos
128                        .into_iter()
129                        .map(|(job_id, fragment_infos)| {
130                            let job_id = TableId::new(job_id as _);
131                            (
132                                job_id,
133                                fragment_infos
134                                    .into_iter()
135                                    .map(|(fragment_id, info)| (fragment_id as _, info))
136                                    .collect(),
137                            )
138                        })
139                        .collect(),
140                )
141            })
142            .collect())
143    }
144
145    #[expect(clippy::type_complexity)]
146    fn resolve_hummock_version_epochs(
147        background_jobs: impl Iterator<Item = (TableId, &HashMap<FragmentId, InflightFragmentInfo>)>,
148        version: &HummockVersion,
149    ) -> MetaResult<(
150        HashMap<TableId, u64>,
151        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
152    )> {
153        let table_committed_epoch: HashMap<_, _> = version
154            .state_table_info
155            .info()
156            .iter()
157            .map(|(table_id, info)| (*table_id, info.committed_epoch))
158            .collect();
159        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
160            Ok(*table_committed_epoch
161                .get(&table_id)
162                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
163        };
164        let mut min_downstream_committed_epochs = HashMap::new();
165        for (job_id, fragments) in background_jobs {
166            let job_committed_epoch = {
167                let mut table_id_iter =
168                    InflightFragmentInfo::existing_table_ids(fragments.values());
169                let Some(first_table_id) = table_id_iter.next() else {
170                    bail!("job {} has no state table", job_id);
171                };
172                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
173                for table_id in table_id_iter {
174                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
175                    if job_committed_epoch != table_committed_epoch {
176                        bail!(
177                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
178                            first_table_id,
179                            job_committed_epoch,
180                            table_id,
181                            table_committed_epoch,
182                            job_id
183                        );
184                    }
185                }
186
187                job_committed_epoch
188            };
189            if let (Some(snapshot_backfill_info), _) =
190                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
191                    fragments
192                        .values()
193                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
194                )?
195            {
196                for (upstream_table, snapshot_epoch) in
197                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
198                {
199                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
200                        anyhow!(
201                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
202                            job_id, upstream_table
203                        )
204                    })?;
205                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
206                    match min_downstream_committed_epochs.entry(upstream_table) {
207                        Entry::Occupied(entry) => {
208                            let prev_min_epoch = entry.into_mut();
209                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
210                        }
211                        Entry::Vacant(entry) => {
212                            entry.insert(pinned_epoch);
213                        }
214                    }
215                }
216            }
217        }
218        let mut log_epochs = HashMap::new();
219        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
220            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
221            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
222                Ordering::Less => {
223                    bail!(
224                        "downstream epoch {} later than upstream epoch {} of table {}",
225                        downstream_committed_epoch,
226                        upstream_committed_epoch,
227                        upstream_table_id
228                    );
229                }
230                Ordering::Equal => {
231                    continue;
232                }
233                Ordering::Greater => {
234                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
235                    {
236                        let epochs = table_change_log
237                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
238                            .map(|epoch_log| {
239                                (
240                                    epoch_log.non_checkpoint_epochs.clone(),
241                                    epoch_log.checkpoint_epoch,
242                                )
243                            })
244                            .collect_vec();
245                        let first_epochs = epochs.first();
246                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
247                            && *first_checkpoint_epoch == downstream_committed_epoch
248                        {
249                        } else {
250                            bail!(
251                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
252                                epochs,
253                                upstream_table_id,
254                                downstream_committed_epoch
255                            );
256                        }
257                        log_epochs
258                            .try_insert(upstream_table_id, epochs)
259                            .expect("non-duplicated");
260                    } else {
261                        bail!(
262                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
263                            upstream_table_id,
264                            upstream_committed_epoch,
265                            downstream_committed_epoch
266                        );
267                    }
268                }
269            }
270        }
271        Ok((table_committed_epoch, log_epochs))
272    }
273
274    fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<u32, HashSet<ActorId>>
275    where
276        I: Iterator<Item = (&'a TableId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
277    {
278        let mut cdc_table_backfill_actors = HashMap::new();
279
280        for (job_id, fragments) in jobs {
281            for fragment_info in fragments.values() {
282                if fragment_info
283                    .fragment_type_mask
284                    .contains(FragmentTypeFlag::StreamCdcScan)
285                {
286                    cdc_table_backfill_actors
287                        .entry(job_id.table_id())
288                        .or_insert_with(HashSet::new)
289                        .extend(fragment_info.actors.keys().cloned());
290                }
291            }
292        }
293
294        cdc_table_backfill_actors
295    }
296
297    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
298    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
299    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
300    /// the operator.
301    async fn recovery_table_with_upstream_sinks(
302        &self,
303        inflight_jobs: &mut HashMap<
304            DatabaseId,
305            HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>,
306        >,
307    ) -> MetaResult<()> {
308        let mut jobs = inflight_jobs.values_mut().try_fold(
309            HashMap::new(),
310            |mut acc, table_map| -> MetaResult<_> {
311                for (tid, job) in table_map {
312                    if acc.insert(tid.table_id, job).is_some() {
313                        return Err(anyhow::anyhow!("Duplicate table id found: {:?}", tid).into());
314                    }
315                }
316                Ok(acc)
317            },
318        )?;
319        let job_ids = jobs.keys().cloned().collect_vec();
320        // Only `Table` will be returned here, ignoring other catalog objects.
321        let tables = self
322            .metadata_manager
323            .catalog_controller
324            .get_user_created_table_by_ids(job_ids.into_iter().map(|id| id as _).collect())
325            .await?;
326        for table in tables {
327            assert_eq!(table.table_type(), PbTableType::Table);
328            let fragment_infos = jobs.get_mut(&table.id).unwrap();
329            let mut target_fragment_id = None;
330            for fragment in fragment_infos.values() {
331                let mut is_target_fragment = false;
332                visit_stream_node_cont(&fragment.nodes, |node| {
333                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
334                        is_target_fragment = true;
335                        false
336                    } else {
337                        true
338                    }
339                });
340                if is_target_fragment {
341                    target_fragment_id = Some(fragment.fragment_id);
342                    break;
343                }
344            }
345            let Some(target_fragment_id) = target_fragment_id else {
346                tracing::debug!(
347                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
348                    table.id
349                );
350                continue;
351            };
352            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
353            let upstream_infos = self
354                .metadata_manager
355                .catalog_controller
356                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
357                .await?;
358            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
359        }
360
361        Ok(())
362    }
363
364    pub(super) async fn reload_runtime_info_impl(
365        &self,
366    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
367        {
368            {
369                {
370                    self.clean_dirty_streaming_jobs(None)
371                        .await
372                        .context("clean dirty streaming jobs")?;
373
374                    // Background job progress needs to be recovered.
375                    tracing::info!("recovering background job progress");
376                    let background_jobs = self
377                        .list_background_job_progress(None)
378                        .await
379                        .context("recover background job progress should not fail")?;
380
381                    tracing::info!("recovered background job progress");
382
383                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
384                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
385                    self.metadata_manager
386                        .catalog_controller
387                        .cleanup_dropped_tables()
388                        .await;
389
390                    let active_streaming_nodes =
391                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
392                            .await?;
393
394                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
395
396                    tracing::info!(
397                        "background streaming jobs: {:?} total {}",
398                        background_streaming_jobs,
399                        background_streaming_jobs.len()
400                    );
401
402                    let unreschedulable_jobs = {
403                        let mut unreschedulable_jobs = HashSet::new();
404
405                        for job_id in background_streaming_jobs {
406                            let scan_types = self
407                                .metadata_manager
408                                .get_job_backfill_scan_types(job_id.table_id as ObjectId)
409                                .await?;
410
411                            if scan_types
412                                .values()
413                                .any(|scan_type| !scan_type.is_reschedulable())
414                            {
415                                unreschedulable_jobs.insert(job_id);
416                            }
417                        }
418
419                        unreschedulable_jobs
420                    };
421
422                    if !unreschedulable_jobs.is_empty() {
423                        tracing::info!(
424                            "unreschedulable background jobs: {:?}",
425                            unreschedulable_jobs
426                        );
427                    }
428
429                    // Resolve actor info for recovery. If there's no actor to recover, most of the
430                    // following steps will be no-op, while the compute nodes will still be reset.
431                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
432                    let mut info = if unreschedulable_jobs.is_empty() {
433                        info!("trigger offline scaling");
434                        self.resolve_graph_info(None, &active_streaming_nodes)
435                            .await
436                            .inspect_err(|err| {
437                                warn!(error = %err.as_report(), "resolve actor info failed");
438                            })?
439                    } else {
440                        bail!(
441                            "Recovery for unreschedulable background jobs is not yet implemented. \
442                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
443                            unreschedulable_jobs
444                        );
445                    };
446
447                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
448                    if !dropped_table_ids.is_empty() {
449                        self.metadata_manager
450                            .catalog_controller
451                            .complete_dropped_tables(
452                                dropped_table_ids.into_iter().map(|id| id.table_id as _),
453                            )
454                            .await;
455                        info = self
456                            .resolve_graph_info(None, &active_streaming_nodes)
457                            .await
458                            .inspect_err(|err| {
459                                warn!(error = %err.as_report(), "resolve actor info failed");
460                            })?
461                    }
462
463                    self.recovery_table_with_upstream_sinks(&mut info).await?;
464
465                    let info = info;
466
467                    self.purge_state_table_from_hummock(
468                        &info
469                            .values()
470                            .flatten()
471                            .flat_map(|(_, fragments)| {
472                                InflightFragmentInfo::existing_table_ids(fragments.values())
473                            })
474                            .collect(),
475                    )
476                    .await
477                    .context("purge state table from hummock")?;
478
479                    let (state_table_committed_epochs, state_table_log_epochs) = self
480                        .hummock_manager
481                        .on_current_version(|version| {
482                            Self::resolve_hummock_version_epochs(
483                                info.values().flat_map(|jobs| {
484                                    jobs.iter().filter_map(|(job_id, job)| {
485                                        background_jobs
486                                            .contains_key(job_id)
487                                            .then_some((*job_id, job))
488                                    })
489                                }),
490                                version,
491                            )
492                        })
493                        .await?;
494
495                    let mv_depended_subscriptions = self
496                        .metadata_manager
497                        .get_mv_depended_subscriptions(None)
498                        .await?;
499
500                    let stream_actors = self.load_stream_actors(&info).await?;
501
502                    let fragment_relations = self
503                        .metadata_manager
504                        .catalog_controller
505                        .get_fragment_downstream_relations(
506                            info.values()
507                                .flatten()
508                                .flat_map(|(_, job)| job.keys())
509                                .map(|fragment_id| *fragment_id as _)
510                                .collect(),
511                        )
512                        .await?;
513
514                    let background_jobs = {
515                        let mut background_jobs = self
516                            .list_background_job_progress(None)
517                            .await
518                            .context("recover background job progress should not fail")?;
519                        info.values()
520                            .flatten()
521                            .filter_map(|(job_id, _)| {
522                                background_jobs
523                                    .remove(job_id)
524                                    .map(|definition| (*job_id, definition))
525                            })
526                            .collect()
527                    };
528
529                    let database_infos = self
530                        .metadata_manager
531                        .catalog_controller
532                        .list_databases()
533                        .await?;
534
535                    // get split assignments for all actors
536                    let mut source_splits = HashMap::new();
537                    for (_, fragment_infos) in info.values().flatten() {
538                        for fragment in fragment_infos.values() {
539                            for (actor_id, info) in &fragment.actors {
540                                source_splits.insert(*actor_id, info.splits.clone());
541                            }
542                        }
543                    }
544
545                    let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
546                        info.values().flat_map(|jobs| jobs.iter()),
547                    );
548
549                    let cdc_table_ids = cdc_table_backfill_actors
550                        .keys()
551                        .cloned()
552                        .collect::<Vec<_>>();
553                    let cdc_table_snapshot_split_assignment =
554                        assign_cdc_table_snapshot_splits_pairs(
555                            cdc_table_backfill_actors,
556                            self.env.meta_store_ref(),
557                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
558                        )
559                        .await?;
560                    let cdc_table_snapshot_split_assignment =
561                        if cdc_table_snapshot_split_assignment.is_empty() {
562                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
563                        } else {
564                            let generation = self
565                                .env
566                                .cdc_table_backfill_tracker
567                                .next_generation(cdc_table_ids.into_iter());
568                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
569                                cdc_table_snapshot_split_assignment,
570                                generation,
571                            )
572                        };
573                    Ok(BarrierWorkerRuntimeInfoSnapshot {
574                        active_streaming_nodes,
575                        database_job_infos: info,
576                        state_table_committed_epochs,
577                        state_table_log_epochs,
578                        mv_depended_subscriptions,
579                        stream_actors,
580                        fragment_relations,
581                        source_splits,
582                        background_jobs,
583                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
584                        database_infos,
585                        cdc_table_snapshot_split_assignment,
586                    })
587                }
588            }
589        }
590    }
591
592    pub(super) async fn reload_database_runtime_info_impl(
593        &self,
594        database_id: DatabaseId,
595    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
596        self.clean_dirty_streaming_jobs(Some(database_id))
597            .await
598            .context("clean dirty streaming jobs")?;
599
600        // Background job progress needs to be recovered.
601        tracing::info!(
602            ?database_id,
603            "recovering background job progress of database"
604        );
605
606        let background_jobs = self
607            .list_background_job_progress(Some(database_id))
608            .await
609            .context("recover background job progress of database should not fail")?;
610        tracing::info!(?database_id, "recovered background job progress");
611
612        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
613        let dropped_table_ids = self
614            .scheduled_barriers
615            .pre_apply_drop_cancel(Some(database_id));
616        self.metadata_manager
617            .catalog_controller
618            .complete_dropped_tables(dropped_table_ids.into_iter().map(|id| id.table_id as _))
619            .await;
620
621        let active_streaming_nodes =
622            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
623
624        let all_info = self
625            .resolve_graph_info(Some(database_id), &active_streaming_nodes)
626            .await
627            .inspect_err(|err| {
628                warn!(error = %err.as_report(), "resolve actor info failed");
629            })?;
630
631        let mut database_info = all_info
632            .get(&database_id)
633            .cloned()
634            .map_or_else(HashMap::new, |table_map| {
635                HashMap::from([(database_id, table_map)])
636            });
637
638        self.recovery_table_with_upstream_sinks(&mut database_info)
639            .await?;
640
641        assert!(database_info.len() <= 1);
642
643        let stream_actors = self.load_stream_actors(&database_info).await?;
644
645        let Some(info) = database_info
646            .into_iter()
647            .next()
648            .map(|(loaded_database_id, info)| {
649                assert_eq!(loaded_database_id, database_id);
650                info
651            })
652        else {
653            return Ok(None);
654        };
655
656        let (state_table_committed_epochs, state_table_log_epochs) = self
657            .hummock_manager
658            .on_current_version(|version| {
659                Self::resolve_hummock_version_epochs(
660                    background_jobs
661                        .keys()
662                        .map(|job_id| (*job_id, &info[job_id])),
663                    version,
664                )
665            })
666            .await?;
667
668        let mv_depended_subscriptions = self
669            .metadata_manager
670            .get_mv_depended_subscriptions(Some(database_id))
671            .await?;
672
673        let fragment_relations = self
674            .metadata_manager
675            .catalog_controller
676            .get_fragment_downstream_relations(
677                info.values()
678                    .flatten()
679                    .map(|(fragment_id, _)| *fragment_id as _)
680                    .collect(),
681            )
682            .await?;
683
684        // get split assignments for all actors
685        let mut source_splits = HashMap::new();
686        for (_, fragment) in info.values().flatten() {
687            for (actor_id, info) in &fragment.actors {
688                source_splits.insert(*actor_id, info.splits.clone());
689            }
690        }
691
692        let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
693
694        let cdc_table_ids = cdc_table_backfill_actors
695            .keys()
696            .cloned()
697            .collect::<Vec<_>>();
698        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
699            cdc_table_backfill_actors,
700            self.env.meta_store_ref(),
701            self.env.cdc_table_backfill_tracker.completed_job_ids(),
702        )
703        .await?;
704        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
705        {
706            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
707        } else {
708            CdcTableSnapshotSplitAssignmentWithGeneration::new(
709                cdc_table_snapshot_split_assignment,
710                self.env
711                    .cdc_table_backfill_tracker
712                    .next_generation(cdc_table_ids.into_iter()),
713            )
714        };
715        Ok(Some(DatabaseRuntimeInfoSnapshot {
716            job_infos: info,
717            state_table_committed_epochs,
718            state_table_log_epochs,
719            mv_depended_subscriptions,
720            stream_actors,
721            fragment_relations,
722            source_splits,
723            background_jobs,
724            cdc_table_snapshot_split_assignment,
725        }))
726    }
727
728    async fn load_stream_actors(
729        &self,
730        all_info: &HashMap<DatabaseId, HashMap<TableId, HashMap<FragmentId, InflightFragmentInfo>>>,
731    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
732        let job_ids = all_info
733            .values()
734            .flat_map(|jobs| jobs.keys().map(|job_id| job_id.table_id as i32))
735            .collect_vec();
736
737        let job_extra_info = self
738            .metadata_manager
739            .catalog_controller
740            .get_streaming_job_extra_info(job_ids)
741            .await?;
742
743        let mut stream_actors = HashMap::new();
744
745        for (job_id, streaming_info) in all_info.values().flatten() {
746            let StreamingJobExtraInfo {
747                timezone,
748                job_definition,
749            } = job_extra_info
750                .get(&(job_id.table_id as i32))
751                .cloned()
752                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id.table_id))?;
753
754            let expr_context = Some(StreamContext { timezone }.to_expr_context());
755
756            for (fragment_id, fragment_info) in streaming_info {
757                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_info.actors {
758                    stream_actors.insert(
759                        *actor_id,
760                        StreamActor {
761                            actor_id: *actor_id as _,
762                            fragment_id: *fragment_id as _,
763                            vnode_bitmap: vnode_bitmap.clone(),
764                            mview_definition: job_definition.clone(),
765                            expr_context: expr_context.clone(),
766                        },
767                    );
768                }
769            }
770        }
771        Ok(stream_actors)
772    }
773}