risingwave_meta/barrier/context/
recovery.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::SinkId;
28use risingwave_pb::catalog::table::PbTableType;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30use thiserror_ext::AsReport;
31use tracing::{info, warn};
32
33use super::BarrierWorkerRuntimeInfoSnapshot;
34use crate::MetaResult;
35use crate::barrier::DatabaseRuntimeInfoSnapshot;
36use crate::barrier::context::GlobalBarrierWorkerContextImpl;
37use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45    /// Clean catalogs for creating streaming jobs that are in foreground mode or table fragments not persisted.
46    async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47        self.metadata_manager
48            .catalog_controller
49            .clean_dirty_subscription(database_id)
50            .await?;
51        let dirty_associated_source_ids = self
52            .metadata_manager
53            .catalog_controller
54            .clean_dirty_creating_jobs(database_id)
55            .await?;
56        self.metadata_manager
57            .reset_all_refresh_jobs_to_idle()
58            .await?;
59
60        // unregister cleaned sources.
61        self.source_manager
62            .apply_source_change(SourceChange::DropSource {
63                dropped_source_ids: dirty_associated_source_ids,
64            })
65            .await;
66
67        Ok(())
68    }
69
70    async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
71        if let Some(database_id) = database_id {
72            let sink_ids = self
73                .metadata_manager
74                .catalog_controller
75                .list_sink_ids(Some(database_id))
76                .await?;
77            self.sink_manager.stop_sink_coordinator(sink_ids).await;
78        } else {
79            self.sink_manager.reset().await;
80        }
81        Ok(())
82    }
83
84    async fn abort_dirty_pending_sink_state(
85        &self,
86        database_id: Option<DatabaseId>,
87    ) -> MetaResult<()> {
88        let pending_sinks: HashSet<SinkId> = self
89            .metadata_manager
90            .catalog_controller
91            .list_all_pending_sinks(database_id)
92            .await?;
93
94        if pending_sinks.is_empty() {
95            return Ok(());
96        }
97
98        let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
99            .metadata_manager
100            .catalog_controller
101            .fetch_sink_with_state_table_ids(pending_sinks)
102            .await?;
103
104        let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
105
106        for (sink_id, table_ids) in sink_with_state_tables {
107            let Some(table_id) = table_ids.first() else {
108                return Err(anyhow!("no state table id in sink: {}", sink_id).into());
109            };
110
111            self.hummock_manager
112                .on_current_version(|version| -> MetaResult<()> {
113                    if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
114                        assert!(
115                            sink_committed_epoch
116                                .insert(sink_id, committed_epoch)
117                                .is_none()
118                        );
119                        Ok(())
120                    } else {
121                        Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
122                    }
123                })
124                .await?;
125        }
126
127        self.metadata_manager
128            .catalog_controller
129            .abort_pending_sink_epochs(sink_committed_epoch)
130            .await?;
131
132        Ok(())
133    }
134
135    async fn purge_state_table_from_hummock(
136        &self,
137        all_state_table_ids: &HashSet<TableId>,
138    ) -> MetaResult<()> {
139        self.hummock_manager.purge(all_state_table_ids).await?;
140        Ok(())
141    }
142
143    async fn list_background_job_progress(
144        &self,
145        database_id: Option<DatabaseId>,
146    ) -> MetaResult<HashMap<JobId, String>> {
147        let mgr = &self.metadata_manager;
148        let job_info = mgr
149            .catalog_controller
150            .list_background_creating_jobs(false, database_id)
151            .await?;
152
153        Ok(job_info
154            .into_iter()
155            .map(|(job_id, definition, _init_at)| (job_id, definition))
156            .collect())
157    }
158
159    /// Resolve actor information from cluster, fragment manager and `ChangedTableId`.
160    /// We use `changed_table_id` to modify the actors to be sent or collected. Because these actor
161    /// will create or drop before this barrier flow through them.
162    async fn resolve_database_info(
163        &self,
164        database_id: Option<DatabaseId>,
165        worker_nodes: &ActiveStreamingWorkerNodes,
166    ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
167    {
168        let all_actor_infos = self
169            .metadata_manager
170            .catalog_controller
171            .load_all_actors_dynamic(database_id, worker_nodes)
172            .await?;
173
174        Ok(all_actor_infos
175            .into_iter()
176            .map(|(loaded_database_id, job_fragment_infos)| {
177                if let Some(database_id) = database_id {
178                    assert_eq!(database_id, loaded_database_id);
179                }
180                (
181                    loaded_database_id,
182                    job_fragment_infos
183                        .into_iter()
184                        .map(|(job_id, fragment_infos)| {
185                            (
186                                job_id,
187                                fragment_infos
188                                    .into_iter()
189                                    .map(|(fragment_id, info)| (fragment_id as _, info))
190                                    .collect(),
191                            )
192                        })
193                        .collect(),
194                )
195            })
196            .collect())
197    }
198
199    #[expect(clippy::type_complexity)]
200    fn resolve_hummock_version_epochs(
201        background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
202        version: &HummockVersion,
203    ) -> MetaResult<(
204        HashMap<TableId, u64>,
205        HashMap<TableId, Vec<(Vec<u64>, u64)>>,
206    )> {
207        let table_committed_epoch: HashMap<_, _> = version
208            .state_table_info
209            .info()
210            .iter()
211            .map(|(table_id, info)| (*table_id, info.committed_epoch))
212            .collect();
213        let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
214            Ok(*table_committed_epoch
215                .get(&table_id)
216                .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
217        };
218        let mut min_downstream_committed_epochs = HashMap::new();
219        for (job_id, fragments) in background_jobs {
220            let job_committed_epoch = {
221                let mut table_id_iter =
222                    InflightFragmentInfo::existing_table_ids(fragments.values());
223                let Some(first_table_id) = table_id_iter.next() else {
224                    bail!("job {} has no state table", job_id);
225                };
226                let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
227                for table_id in table_id_iter {
228                    let table_committed_epoch = get_table_committed_epoch(table_id)?;
229                    if job_committed_epoch != table_committed_epoch {
230                        bail!(
231                            "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
232                            first_table_id,
233                            job_committed_epoch,
234                            table_id,
235                            table_committed_epoch,
236                            job_id
237                        );
238                    }
239                }
240
241                job_committed_epoch
242            };
243            if let (Some(snapshot_backfill_info), _) =
244                StreamFragmentGraph::collect_snapshot_backfill_info_impl(
245                    fragments
246                        .values()
247                        .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
248                )?
249            {
250                for (upstream_table, snapshot_epoch) in
251                    snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
252                {
253                    let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
254                        anyhow!(
255                            "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
256                            job_id, upstream_table
257                        )
258                    })?;
259                    let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
260                    match min_downstream_committed_epochs.entry(upstream_table) {
261                        Entry::Occupied(entry) => {
262                            let prev_min_epoch = entry.into_mut();
263                            *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
264                        }
265                        Entry::Vacant(entry) => {
266                            entry.insert(pinned_epoch);
267                        }
268                    }
269                }
270            }
271        }
272        let mut log_epochs = HashMap::new();
273        for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
274            let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
275            match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
276                Ordering::Less => {
277                    bail!(
278                        "downstream epoch {} later than upstream epoch {} of table {}",
279                        downstream_committed_epoch,
280                        upstream_committed_epoch,
281                        upstream_table_id
282                    );
283                }
284                Ordering::Equal => {
285                    continue;
286                }
287                Ordering::Greater => {
288                    if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
289                    {
290                        let epochs = table_change_log
291                            .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
292                            .map(|epoch_log| {
293                                (
294                                    epoch_log.non_checkpoint_epochs.clone(),
295                                    epoch_log.checkpoint_epoch,
296                                )
297                            })
298                            .collect_vec();
299                        let first_epochs = epochs.first();
300                        if let Some((_, first_checkpoint_epoch)) = &first_epochs
301                            && *first_checkpoint_epoch == downstream_committed_epoch
302                        {
303                        } else {
304                            bail!(
305                                "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
306                                epochs,
307                                upstream_table_id,
308                                downstream_committed_epoch
309                            );
310                        }
311                        log_epochs
312                            .try_insert(upstream_table_id, epochs)
313                            .expect("non-duplicated");
314                    } else {
315                        bail!(
316                            "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
317                            upstream_table_id,
318                            upstream_committed_epoch,
319                            downstream_committed_epoch
320                        );
321                    }
322                }
323            }
324        }
325        Ok((table_committed_epoch, log_epochs))
326    }
327
328    fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
329    where
330        I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
331    {
332        let mut cdc_table_backfill_actors = HashMap::new();
333
334        for (job_id, fragments) in jobs {
335            for fragment_infos in fragments.values() {
336                if fragment_infos
337                    .fragment_type_mask
338                    .contains(FragmentTypeFlag::StreamCdcScan)
339                {
340                    cdc_table_backfill_actors
341                        .entry(*job_id)
342                        .or_insert_with(HashSet::new)
343                        .extend(fragment_infos.actors.keys().cloned());
344                }
345            }
346        }
347
348        cdc_table_backfill_actors
349    }
350
351    /// For normal DDL operations, the `UpstreamSinkUnion` operator is modified dynamically, and does not persist the
352    /// newly added or deleted upstreams in meta-store. Therefore, when restoring jobs, we need to restore the
353    /// information required by the operator based on the current state of the upstream (sink) and downstream (table) of
354    /// the operator.
355    async fn recovery_table_with_upstream_sinks(
356        &self,
357        inflight_jobs: &mut HashMap<
358            DatabaseId,
359            HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
360        >,
361    ) -> MetaResult<()> {
362        let mut jobs = inflight_jobs.values_mut().try_fold(
363            HashMap::new(),
364            |mut acc, table_map| -> MetaResult<_> {
365                for (job_id, job) in table_map {
366                    if acc.insert(*job_id, job).is_some() {
367                        return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
368                    }
369                }
370                Ok(acc)
371            },
372        )?;
373        // Only `Table` will be returned here, ignoring other catalog objects.
374        let tables = self
375            .metadata_manager
376            .catalog_controller
377            .get_user_created_table_by_ids(jobs.keys().copied())
378            .await?;
379        for table in tables {
380            assert_eq!(table.table_type(), PbTableType::Table);
381            let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
382            let mut target_fragment_id = None;
383            for fragment in fragment_infos.values() {
384                let mut is_target_fragment = false;
385                visit_stream_node_cont(&fragment.nodes, |node| {
386                    if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
387                        is_target_fragment = true;
388                        false
389                    } else {
390                        true
391                    }
392                });
393                if is_target_fragment {
394                    target_fragment_id = Some(fragment.fragment_id);
395                    break;
396                }
397            }
398            let Some(target_fragment_id) = target_fragment_id else {
399                tracing::debug!(
400                    "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
401                    table.id
402                );
403                continue;
404            };
405            let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
406            let upstream_infos = self
407                .metadata_manager
408                .catalog_controller
409                .get_all_upstream_sink_infos(&table, target_fragment_id as _)
410                .await?;
411            refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
412        }
413
414        Ok(())
415    }
416
417    pub(super) async fn reload_runtime_info_impl(
418        &self,
419    ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
420        {
421            {
422                {
423                    self.clean_dirty_streaming_jobs(None)
424                        .await
425                        .context("clean dirty streaming jobs")?;
426
427                    self.reset_sink_coordinator(None)
428                        .await
429                        .context("reset sink coordinator")?;
430                    self.abort_dirty_pending_sink_state(None)
431                        .await
432                        .context("abort dirty pending sink state")?;
433
434                    // Background job progress needs to be recovered.
435                    tracing::info!("recovering background job progress");
436                    let background_jobs = self
437                        .list_background_job_progress(None)
438                        .await
439                        .context("recover background job progress should not fail")?;
440
441                    tracing::info!("recovered background job progress");
442
443                    // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
444                    let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
445                    self.metadata_manager
446                        .catalog_controller
447                        .cleanup_dropped_tables()
448                        .await;
449
450                    let active_streaming_nodes =
451                        ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
452                            .await?;
453
454                    let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
455
456                    tracing::info!(
457                        "background streaming jobs: {:?} total {}",
458                        background_streaming_jobs,
459                        background_streaming_jobs.len()
460                    );
461
462                    let unreschedulable_jobs = {
463                        let mut unreschedulable_jobs = HashSet::new();
464
465                        for job_id in background_streaming_jobs {
466                            let scan_types = self
467                                .metadata_manager
468                                .get_job_backfill_scan_types(job_id)
469                                .await?;
470
471                            if scan_types
472                                .values()
473                                .any(|scan_type| !scan_type.is_reschedulable())
474                            {
475                                unreschedulable_jobs.insert(job_id);
476                            }
477                        }
478
479                        unreschedulable_jobs
480                    };
481
482                    if !unreschedulable_jobs.is_empty() {
483                        tracing::info!(
484                            "unreschedulable background jobs: {:?}",
485                            unreschedulable_jobs
486                        );
487                    }
488
489                    // Resolve actor info for recovery. If there's no actor to recover, most of the
490                    // following steps will be no-op, while the compute nodes will still be reset.
491                    // TODO(error-handling): attach context to the errors and log them together, instead of inspecting everywhere.
492                    let mut info = if unreschedulable_jobs.is_empty() {
493                        info!("trigger offline scaling");
494                        self.resolve_database_info(None, &active_streaming_nodes)
495                            .await
496                            .inspect_err(|err| {
497                                warn!(error = %err.as_report(), "resolve actor info failed");
498                            })?
499                    } else {
500                        bail!(
501                            "Recovery for unreschedulable background jobs is not yet implemented. \
502                             This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
503                            unreschedulable_jobs
504                        );
505                    };
506
507                    let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
508                    if !dropped_table_ids.is_empty() {
509                        self.metadata_manager
510                            .catalog_controller
511                            .complete_dropped_tables(dropped_table_ids)
512                            .await;
513                        info = self
514                            .resolve_database_info(None, &active_streaming_nodes)
515                            .await
516                            .inspect_err(|err| {
517                                warn!(error = %err.as_report(), "resolve actor info failed");
518                            })?
519                    }
520
521                    self.recovery_table_with_upstream_sinks(&mut info).await?;
522
523                    let info = info;
524
525                    self.purge_state_table_from_hummock(
526                        &info
527                            .values()
528                            .flatten()
529                            .flat_map(|(_, fragments)| {
530                                InflightFragmentInfo::existing_table_ids(fragments.values())
531                            })
532                            .collect(),
533                    )
534                    .await
535                    .context("purge state table from hummock")?;
536
537                    let (state_table_committed_epochs, state_table_log_epochs) = self
538                        .hummock_manager
539                        .on_current_version(|version| {
540                            Self::resolve_hummock_version_epochs(
541                                info.values().flat_map(|jobs| {
542                                    jobs.iter().filter_map(|(job_id, job)| {
543                                        background_jobs
544                                            .contains_key(job_id)
545                                            .then_some((*job_id, job))
546                                    })
547                                }),
548                                version,
549                            )
550                        })
551                        .await?;
552
553                    let mv_depended_subscriptions = self
554                        .metadata_manager
555                        .get_mv_depended_subscriptions(None)
556                        .await?;
557
558                    let stream_actors = self.load_stream_actors(&info).await?;
559
560                    let fragment_relations = self
561                        .metadata_manager
562                        .catalog_controller
563                        .get_fragment_downstream_relations(
564                            info.values()
565                                .flatten()
566                                .flat_map(|(_, job)| job.keys())
567                                .map(|fragment_id| *fragment_id as _)
568                                .collect(),
569                        )
570                        .await?;
571
572                    let background_jobs = {
573                        let mut background_jobs = self
574                            .list_background_job_progress(None)
575                            .await
576                            .context("recover background job progress should not fail")?;
577                        info.values()
578                            .flatten()
579                            .filter_map(|(job_id, _)| {
580                                background_jobs
581                                    .remove(job_id)
582                                    .map(|definition| (*job_id, definition))
583                            })
584                            .collect()
585                    };
586
587                    let database_infos = self
588                        .metadata_manager
589                        .catalog_controller
590                        .list_databases()
591                        .await?;
592
593                    // get split assignments for all actors
594                    let mut source_splits = HashMap::new();
595                    for (_, fragment_infos) in info.values().flatten() {
596                        for fragment in fragment_infos.values() {
597                            for (actor_id, info) in &fragment.actors {
598                                source_splits.insert(*actor_id, info.splits.clone());
599                            }
600                        }
601                    }
602
603                    let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
604                        info.values().flat_map(|jobs| jobs.iter()),
605                    );
606
607                    let cdc_table_ids = cdc_table_backfill_actors
608                        .keys()
609                        .cloned()
610                        .collect::<Vec<_>>();
611                    let cdc_table_snapshot_split_assignment =
612                        assign_cdc_table_snapshot_splits_pairs(
613                            cdc_table_backfill_actors,
614                            self.env.meta_store_ref(),
615                            self.env.cdc_table_backfill_tracker.completed_job_ids(),
616                        )
617                        .await?;
618                    let cdc_table_snapshot_split_assignment =
619                        if cdc_table_snapshot_split_assignment.is_empty() {
620                            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
621                        } else {
622                            let generation = self
623                                .env
624                                .cdc_table_backfill_tracker
625                                .next_generation(cdc_table_ids.into_iter());
626                            CdcTableSnapshotSplitAssignmentWithGeneration::new(
627                                cdc_table_snapshot_split_assignment,
628                                generation,
629                            )
630                        };
631                    Ok(BarrierWorkerRuntimeInfoSnapshot {
632                        active_streaming_nodes,
633                        database_job_infos: info,
634                        state_table_committed_epochs,
635                        state_table_log_epochs,
636                        mv_depended_subscriptions,
637                        stream_actors,
638                        fragment_relations,
639                        source_splits,
640                        background_jobs,
641                        hummock_version_stats: self.hummock_manager.get_version_stats().await,
642                        database_infos,
643                        cdc_table_snapshot_split_assignment,
644                    })
645                }
646            }
647        }
648    }
649
650    pub(super) async fn reload_database_runtime_info_impl(
651        &self,
652        database_id: DatabaseId,
653    ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
654        self.clean_dirty_streaming_jobs(Some(database_id))
655            .await
656            .context("clean dirty streaming jobs")?;
657
658        self.reset_sink_coordinator(Some(database_id))
659            .await
660            .context("reset sink coordinator")?;
661        self.abort_dirty_pending_sink_state(Some(database_id))
662            .await
663            .context("abort dirty pending sink state")?;
664
665        // Background job progress needs to be recovered.
666        tracing::info!(
667            ?database_id,
668            "recovering background job progress of database"
669        );
670
671        let background_jobs = self
672            .list_background_job_progress(Some(database_id))
673            .await
674            .context("recover background job progress of database should not fail")?;
675        tracing::info!(?database_id, "recovered background job progress");
676
677        // This is a quick path to accelerate the process of dropping and canceling streaming jobs.
678        let dropped_table_ids = self
679            .scheduled_barriers
680            .pre_apply_drop_cancel(Some(database_id));
681        self.metadata_manager
682            .catalog_controller
683            .complete_dropped_tables(dropped_table_ids)
684            .await;
685
686        let active_streaming_nodes =
687            ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
688
689        let mut all_info = self
690            .resolve_database_info(Some(database_id), &active_streaming_nodes)
691            .await
692            .inspect_err(|err| {
693                warn!(error = %err.as_report(), "resolve actor info failed");
694            })?;
695
696        let mut database_info = all_info
697            .remove(&database_id)
698            .map_or_else(HashMap::new, |table_map| {
699                HashMap::from([(database_id, table_map)])
700            });
701
702        self.recovery_table_with_upstream_sinks(&mut database_info)
703            .await?;
704
705        assert!(database_info.len() <= 1);
706
707        let stream_actors = self.load_stream_actors(&database_info).await?;
708
709        let Some(info) = database_info
710            .into_iter()
711            .next()
712            .map(|(loaded_database_id, info)| {
713                assert_eq!(loaded_database_id, database_id);
714                info
715            })
716        else {
717            return Ok(None);
718        };
719
720        let (state_table_committed_epochs, state_table_log_epochs) = self
721            .hummock_manager
722            .on_current_version(|version| {
723                Self::resolve_hummock_version_epochs(
724                    background_jobs
725                        .keys()
726                        .map(|job_id| (*job_id, &info[job_id])),
727                    version,
728                )
729            })
730            .await?;
731
732        let mv_depended_subscriptions = self
733            .metadata_manager
734            .get_mv_depended_subscriptions(Some(database_id))
735            .await?;
736
737        let fragment_relations = self
738            .metadata_manager
739            .catalog_controller
740            .get_fragment_downstream_relations(
741                info.values()
742                    .flatten()
743                    .map(|(fragment_id, _)| *fragment_id as _)
744                    .collect(),
745            )
746            .await?;
747
748        // get split assignments for all actors
749        let mut source_splits = HashMap::new();
750        for (_, fragment) in info.values().flatten() {
751            for (actor_id, info) in &fragment.actors {
752                source_splits.insert(*actor_id, info.splits.clone());
753            }
754        }
755
756        let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
757
758        let cdc_table_ids = cdc_table_backfill_actors
759            .keys()
760            .cloned()
761            .collect::<Vec<_>>();
762        let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
763            cdc_table_backfill_actors,
764            self.env.meta_store_ref(),
765            self.env.cdc_table_backfill_tracker.completed_job_ids(),
766        )
767        .await?;
768        let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
769        {
770            CdcTableSnapshotSplitAssignmentWithGeneration::empty()
771        } else {
772            CdcTableSnapshotSplitAssignmentWithGeneration::new(
773                cdc_table_snapshot_split_assignment,
774                self.env
775                    .cdc_table_backfill_tracker
776                    .next_generation(cdc_table_ids.into_iter()),
777            )
778        };
779
780        self.refresh_manager
781            .remove_trackers_by_database(database_id);
782
783        Ok(Some(DatabaseRuntimeInfoSnapshot {
784            job_infos: info,
785            state_table_committed_epochs,
786            state_table_log_epochs,
787            mv_depended_subscriptions,
788            stream_actors,
789            fragment_relations,
790            source_splits,
791            background_jobs,
792            cdc_table_snapshot_split_assignment,
793        }))
794    }
795
796    async fn load_stream_actors(
797        &self,
798        all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
799    ) -> MetaResult<HashMap<ActorId, StreamActor>> {
800        let job_ids = all_info
801            .values()
802            .flat_map(|jobs| jobs.keys().copied())
803            .collect_vec();
804
805        let job_extra_info = self
806            .metadata_manager
807            .catalog_controller
808            .get_streaming_job_extra_info(job_ids)
809            .await?;
810
811        let mut stream_actors = HashMap::new();
812
813        for (job_id, streaming_info) in all_info.values().flatten() {
814            let extra_info = job_extra_info
815                .get(job_id)
816                .cloned()
817                .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
818            let expr_context = extra_info.stream_context().to_expr_context();
819            let job_definition = extra_info.job_definition;
820            let config_override = extra_info.config_override;
821
822            for (fragment_id, fragment_infos) in streaming_info {
823                for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
824                    stream_actors.insert(
825                        *actor_id,
826                        StreamActor {
827                            actor_id: *actor_id as _,
828                            fragment_id: *fragment_id,
829                            vnode_bitmap: vnode_bitmap.clone(),
830                            mview_definition: job_definition.clone(),
831                            expr_context: Some(expr_context.clone()),
832                            config_override: config_override.clone(),
833                        },
834                    );
835                }
836            }
837        }
838        Ok(stream_actors)
839    }
840}