1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{HashMap, HashSet};
18
19use anyhow::{Context, anyhow};
20use itertools::Itertools;
21use risingwave_common::bail;
22use risingwave_common::catalog::{DatabaseId, FragmentTypeFlag, TableId};
23use risingwave_common::id::JobId;
24use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
25use risingwave_connector::source::cdc::CdcTableSnapshotSplitAssignmentWithGeneration;
26use risingwave_hummock_sdk::version::HummockVersion;
27use risingwave_meta_model::SinkId;
28use risingwave_pb::catalog::table::PbTableType;
29use risingwave_pb::stream_plan::stream_node::PbNodeBody;
30use thiserror_ext::AsReport;
31use tracing::{info, warn};
32
33use super::BarrierWorkerRuntimeInfoSnapshot;
34use crate::MetaResult;
35use crate::barrier::DatabaseRuntimeInfoSnapshot;
36use crate::barrier::context::GlobalBarrierWorkerContextImpl;
37use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
38use crate::manager::ActiveStreamingWorkerNodes;
39use crate::model::{ActorId, FragmentId, StreamActor};
40use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
41use crate::stream::cdc::assign_cdc_table_snapshot_splits_pairs;
42use crate::stream::{SourceChange, StreamFragmentGraph};
43
44impl GlobalBarrierWorkerContextImpl {
45 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
47 self.metadata_manager
48 .catalog_controller
49 .clean_dirty_subscription(database_id)
50 .await?;
51 let dirty_associated_source_ids = self
52 .metadata_manager
53 .catalog_controller
54 .clean_dirty_creating_jobs(database_id)
55 .await?;
56 self.metadata_manager
57 .reset_all_refresh_jobs_to_idle()
58 .await?;
59
60 self.source_manager
62 .apply_source_change(SourceChange::DropSource {
63 dropped_source_ids: dirty_associated_source_ids,
64 })
65 .await;
66
67 Ok(())
68 }
69
70 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
71 if let Some(database_id) = database_id {
72 let sink_ids = self
73 .metadata_manager
74 .catalog_controller
75 .list_sink_ids(Some(database_id))
76 .await?;
77 self.sink_manager.stop_sink_coordinator(sink_ids).await;
78 } else {
79 self.sink_manager.reset().await;
80 }
81 Ok(())
82 }
83
84 async fn abort_dirty_pending_sink_state(
85 &self,
86 database_id: Option<DatabaseId>,
87 ) -> MetaResult<()> {
88 let pending_sinks: HashSet<SinkId> = self
89 .metadata_manager
90 .catalog_controller
91 .list_all_pending_sinks(database_id)
92 .await?;
93
94 if pending_sinks.is_empty() {
95 return Ok(());
96 }
97
98 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
99 .metadata_manager
100 .catalog_controller
101 .fetch_sink_with_state_table_ids(pending_sinks)
102 .await?;
103
104 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
105
106 for (sink_id, table_ids) in sink_with_state_tables {
107 let Some(table_id) = table_ids.first() else {
108 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
109 };
110
111 self.hummock_manager
112 .on_current_version(|version| -> MetaResult<()> {
113 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
114 assert!(
115 sink_committed_epoch
116 .insert(sink_id, committed_epoch)
117 .is_none()
118 );
119 Ok(())
120 } else {
121 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
122 }
123 })
124 .await?;
125 }
126
127 self.metadata_manager
128 .catalog_controller
129 .abort_pending_sink_epochs(sink_committed_epoch)
130 .await?;
131
132 Ok(())
133 }
134
135 async fn purge_state_table_from_hummock(
136 &self,
137 all_state_table_ids: &HashSet<TableId>,
138 ) -> MetaResult<()> {
139 self.hummock_manager.purge(all_state_table_ids).await?;
140 Ok(())
141 }
142
143 async fn list_background_job_progress(
144 &self,
145 database_id: Option<DatabaseId>,
146 ) -> MetaResult<HashMap<JobId, String>> {
147 let mgr = &self.metadata_manager;
148 let job_info = mgr
149 .catalog_controller
150 .list_background_creating_jobs(false, database_id)
151 .await?;
152
153 Ok(job_info
154 .into_iter()
155 .map(|(job_id, definition, _init_at)| (job_id, definition))
156 .collect())
157 }
158
159 async fn resolve_database_info(
163 &self,
164 database_id: Option<DatabaseId>,
165 worker_nodes: &ActiveStreamingWorkerNodes,
166 ) -> MetaResult<HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>>
167 {
168 let all_actor_infos = self
169 .metadata_manager
170 .catalog_controller
171 .load_all_actors_dynamic(database_id, worker_nodes)
172 .await?;
173
174 Ok(all_actor_infos
175 .into_iter()
176 .map(|(loaded_database_id, job_fragment_infos)| {
177 if let Some(database_id) = database_id {
178 assert_eq!(database_id, loaded_database_id);
179 }
180 (
181 loaded_database_id,
182 job_fragment_infos
183 .into_iter()
184 .map(|(job_id, fragment_infos)| {
185 (
186 job_id,
187 fragment_infos
188 .into_iter()
189 .map(|(fragment_id, info)| (fragment_id as _, info))
190 .collect(),
191 )
192 })
193 .collect(),
194 )
195 })
196 .collect())
197 }
198
199 #[expect(clippy::type_complexity)]
200 fn resolve_hummock_version_epochs(
201 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
202 version: &HummockVersion,
203 ) -> MetaResult<(
204 HashMap<TableId, u64>,
205 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
206 )> {
207 let table_committed_epoch: HashMap<_, _> = version
208 .state_table_info
209 .info()
210 .iter()
211 .map(|(table_id, info)| (*table_id, info.committed_epoch))
212 .collect();
213 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
214 Ok(*table_committed_epoch
215 .get(&table_id)
216 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
217 };
218 let mut min_downstream_committed_epochs = HashMap::new();
219 for (job_id, fragments) in background_jobs {
220 let job_committed_epoch = {
221 let mut table_id_iter =
222 InflightFragmentInfo::existing_table_ids(fragments.values());
223 let Some(first_table_id) = table_id_iter.next() else {
224 bail!("job {} has no state table", job_id);
225 };
226 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
227 for table_id in table_id_iter {
228 let table_committed_epoch = get_table_committed_epoch(table_id)?;
229 if job_committed_epoch != table_committed_epoch {
230 bail!(
231 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
232 first_table_id,
233 job_committed_epoch,
234 table_id,
235 table_committed_epoch,
236 job_id
237 );
238 }
239 }
240
241 job_committed_epoch
242 };
243 if let (Some(snapshot_backfill_info), _) =
244 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
245 fragments
246 .values()
247 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
248 )?
249 {
250 for (upstream_table, snapshot_epoch) in
251 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
252 {
253 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
254 anyhow!(
255 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
256 job_id, upstream_table
257 )
258 })?;
259 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
260 match min_downstream_committed_epochs.entry(upstream_table) {
261 Entry::Occupied(entry) => {
262 let prev_min_epoch = entry.into_mut();
263 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
264 }
265 Entry::Vacant(entry) => {
266 entry.insert(pinned_epoch);
267 }
268 }
269 }
270 }
271 }
272 let mut log_epochs = HashMap::new();
273 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
274 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
275 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
276 Ordering::Less => {
277 bail!(
278 "downstream epoch {} later than upstream epoch {} of table {}",
279 downstream_committed_epoch,
280 upstream_committed_epoch,
281 upstream_table_id
282 );
283 }
284 Ordering::Equal => {
285 continue;
286 }
287 Ordering::Greater => {
288 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
289 {
290 let epochs = table_change_log
291 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
292 .map(|epoch_log| {
293 (
294 epoch_log.non_checkpoint_epochs.clone(),
295 epoch_log.checkpoint_epoch,
296 )
297 })
298 .collect_vec();
299 let first_epochs = epochs.first();
300 if let Some((_, first_checkpoint_epoch)) = &first_epochs
301 && *first_checkpoint_epoch == downstream_committed_epoch
302 {
303 } else {
304 bail!(
305 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
306 epochs,
307 upstream_table_id,
308 downstream_committed_epoch
309 );
310 }
311 log_epochs
312 .try_insert(upstream_table_id, epochs)
313 .expect("non-duplicated");
314 } else {
315 bail!(
316 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
317 upstream_table_id,
318 upstream_committed_epoch,
319 downstream_committed_epoch
320 );
321 }
322 }
323 }
324 }
325 Ok((table_committed_epoch, log_epochs))
326 }
327
328 fn collect_cdc_table_backfill_actors<'a, I>(jobs: I) -> HashMap<JobId, HashSet<ActorId>>
329 where
330 I: Iterator<Item = (&'a JobId, &'a HashMap<FragmentId, InflightFragmentInfo>)>,
331 {
332 let mut cdc_table_backfill_actors = HashMap::new();
333
334 for (job_id, fragments) in jobs {
335 for fragment_infos in fragments.values() {
336 if fragment_infos
337 .fragment_type_mask
338 .contains(FragmentTypeFlag::StreamCdcScan)
339 {
340 cdc_table_backfill_actors
341 .entry(*job_id)
342 .or_insert_with(HashSet::new)
343 .extend(fragment_infos.actors.keys().cloned());
344 }
345 }
346 }
347
348 cdc_table_backfill_actors
349 }
350
351 async fn recovery_table_with_upstream_sinks(
356 &self,
357 inflight_jobs: &mut HashMap<
358 DatabaseId,
359 HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>,
360 >,
361 ) -> MetaResult<()> {
362 let mut jobs = inflight_jobs.values_mut().try_fold(
363 HashMap::new(),
364 |mut acc, table_map| -> MetaResult<_> {
365 for (job_id, job) in table_map {
366 if acc.insert(*job_id, job).is_some() {
367 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
368 }
369 }
370 Ok(acc)
371 },
372 )?;
373 let tables = self
375 .metadata_manager
376 .catalog_controller
377 .get_user_created_table_by_ids(jobs.keys().copied())
378 .await?;
379 for table in tables {
380 assert_eq!(table.table_type(), PbTableType::Table);
381 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
382 let mut target_fragment_id = None;
383 for fragment in fragment_infos.values() {
384 let mut is_target_fragment = false;
385 visit_stream_node_cont(&fragment.nodes, |node| {
386 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
387 is_target_fragment = true;
388 false
389 } else {
390 true
391 }
392 });
393 if is_target_fragment {
394 target_fragment_id = Some(fragment.fragment_id);
395 break;
396 }
397 }
398 let Some(target_fragment_id) = target_fragment_id else {
399 tracing::debug!(
400 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
401 table.id
402 );
403 continue;
404 };
405 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
406 let upstream_infos = self
407 .metadata_manager
408 .catalog_controller
409 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
410 .await?;
411 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
412 }
413
414 Ok(())
415 }
416
417 pub(super) async fn reload_runtime_info_impl(
418 &self,
419 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
420 {
421 {
422 {
423 self.clean_dirty_streaming_jobs(None)
424 .await
425 .context("clean dirty streaming jobs")?;
426
427 self.reset_sink_coordinator(None)
428 .await
429 .context("reset sink coordinator")?;
430 self.abort_dirty_pending_sink_state(None)
431 .await
432 .context("abort dirty pending sink state")?;
433
434 tracing::info!("recovering background job progress");
436 let background_jobs = self
437 .list_background_job_progress(None)
438 .await
439 .context("recover background job progress should not fail")?;
440
441 tracing::info!("recovered background job progress");
442
443 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
445 self.metadata_manager
446 .catalog_controller
447 .cleanup_dropped_tables()
448 .await;
449
450 let active_streaming_nodes =
451 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
452 .await?;
453
454 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
455
456 tracing::info!(
457 "background streaming jobs: {:?} total {}",
458 background_streaming_jobs,
459 background_streaming_jobs.len()
460 );
461
462 let unreschedulable_jobs = {
463 let mut unreschedulable_jobs = HashSet::new();
464
465 for job_id in background_streaming_jobs {
466 let scan_types = self
467 .metadata_manager
468 .get_job_backfill_scan_types(job_id)
469 .await?;
470
471 if scan_types
472 .values()
473 .any(|scan_type| !scan_type.is_reschedulable())
474 {
475 unreschedulable_jobs.insert(job_id);
476 }
477 }
478
479 unreschedulable_jobs
480 };
481
482 if !unreschedulable_jobs.is_empty() {
483 tracing::info!(
484 "unreschedulable background jobs: {:?}",
485 unreschedulable_jobs
486 );
487 }
488
489 let mut info = if unreschedulable_jobs.is_empty() {
493 info!("trigger offline scaling");
494 self.resolve_database_info(None, &active_streaming_nodes)
495 .await
496 .inspect_err(|err| {
497 warn!(error = %err.as_report(), "resolve actor info failed");
498 })?
499 } else {
500 bail!(
501 "Recovery for unreschedulable background jobs is not yet implemented. \
502 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
503 unreschedulable_jobs
504 );
505 };
506
507 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
508 if !dropped_table_ids.is_empty() {
509 self.metadata_manager
510 .catalog_controller
511 .complete_dropped_tables(dropped_table_ids)
512 .await;
513 info = self
514 .resolve_database_info(None, &active_streaming_nodes)
515 .await
516 .inspect_err(|err| {
517 warn!(error = %err.as_report(), "resolve actor info failed");
518 })?
519 }
520
521 self.recovery_table_with_upstream_sinks(&mut info).await?;
522
523 let info = info;
524
525 self.purge_state_table_from_hummock(
526 &info
527 .values()
528 .flatten()
529 .flat_map(|(_, fragments)| {
530 InflightFragmentInfo::existing_table_ids(fragments.values())
531 })
532 .collect(),
533 )
534 .await
535 .context("purge state table from hummock")?;
536
537 let (state_table_committed_epochs, state_table_log_epochs) = self
538 .hummock_manager
539 .on_current_version(|version| {
540 Self::resolve_hummock_version_epochs(
541 info.values().flat_map(|jobs| {
542 jobs.iter().filter_map(|(job_id, job)| {
543 background_jobs
544 .contains_key(job_id)
545 .then_some((*job_id, job))
546 })
547 }),
548 version,
549 )
550 })
551 .await?;
552
553 let mv_depended_subscriptions = self
554 .metadata_manager
555 .get_mv_depended_subscriptions(None)
556 .await?;
557
558 let stream_actors = self.load_stream_actors(&info).await?;
559
560 let fragment_relations = self
561 .metadata_manager
562 .catalog_controller
563 .get_fragment_downstream_relations(
564 info.values()
565 .flatten()
566 .flat_map(|(_, job)| job.keys())
567 .map(|fragment_id| *fragment_id as _)
568 .collect(),
569 )
570 .await?;
571
572 let background_jobs = {
573 let mut background_jobs = self
574 .list_background_job_progress(None)
575 .await
576 .context("recover background job progress should not fail")?;
577 info.values()
578 .flatten()
579 .filter_map(|(job_id, _)| {
580 background_jobs
581 .remove(job_id)
582 .map(|definition| (*job_id, definition))
583 })
584 .collect()
585 };
586
587 let database_infos = self
588 .metadata_manager
589 .catalog_controller
590 .list_databases()
591 .await?;
592
593 let mut source_splits = HashMap::new();
595 for (_, fragment_infos) in info.values().flatten() {
596 for fragment in fragment_infos.values() {
597 for (actor_id, info) in &fragment.actors {
598 source_splits.insert(*actor_id, info.splits.clone());
599 }
600 }
601 }
602
603 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(
604 info.values().flat_map(|jobs| jobs.iter()),
605 );
606
607 let cdc_table_ids = cdc_table_backfill_actors
608 .keys()
609 .cloned()
610 .collect::<Vec<_>>();
611 let cdc_table_snapshot_split_assignment =
612 assign_cdc_table_snapshot_splits_pairs(
613 cdc_table_backfill_actors,
614 self.env.meta_store_ref(),
615 self.env.cdc_table_backfill_tracker.completed_job_ids(),
616 )
617 .await?;
618 let cdc_table_snapshot_split_assignment =
619 if cdc_table_snapshot_split_assignment.is_empty() {
620 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
621 } else {
622 let generation = self
623 .env
624 .cdc_table_backfill_tracker
625 .next_generation(cdc_table_ids.into_iter());
626 CdcTableSnapshotSplitAssignmentWithGeneration::new(
627 cdc_table_snapshot_split_assignment,
628 generation,
629 )
630 };
631 Ok(BarrierWorkerRuntimeInfoSnapshot {
632 active_streaming_nodes,
633 database_job_infos: info,
634 state_table_committed_epochs,
635 state_table_log_epochs,
636 mv_depended_subscriptions,
637 stream_actors,
638 fragment_relations,
639 source_splits,
640 background_jobs,
641 hummock_version_stats: self.hummock_manager.get_version_stats().await,
642 database_infos,
643 cdc_table_snapshot_split_assignment,
644 })
645 }
646 }
647 }
648 }
649
650 pub(super) async fn reload_database_runtime_info_impl(
651 &self,
652 database_id: DatabaseId,
653 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
654 self.clean_dirty_streaming_jobs(Some(database_id))
655 .await
656 .context("clean dirty streaming jobs")?;
657
658 self.reset_sink_coordinator(Some(database_id))
659 .await
660 .context("reset sink coordinator")?;
661 self.abort_dirty_pending_sink_state(Some(database_id))
662 .await
663 .context("abort dirty pending sink state")?;
664
665 tracing::info!(
667 ?database_id,
668 "recovering background job progress of database"
669 );
670
671 let background_jobs = self
672 .list_background_job_progress(Some(database_id))
673 .await
674 .context("recover background job progress of database should not fail")?;
675 tracing::info!(?database_id, "recovered background job progress");
676
677 let dropped_table_ids = self
679 .scheduled_barriers
680 .pre_apply_drop_cancel(Some(database_id));
681 self.metadata_manager
682 .catalog_controller
683 .complete_dropped_tables(dropped_table_ids)
684 .await;
685
686 let active_streaming_nodes =
687 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
688
689 let mut all_info = self
690 .resolve_database_info(Some(database_id), &active_streaming_nodes)
691 .await
692 .inspect_err(|err| {
693 warn!(error = %err.as_report(), "resolve actor info failed");
694 })?;
695
696 let mut database_info = all_info
697 .remove(&database_id)
698 .map_or_else(HashMap::new, |table_map| {
699 HashMap::from([(database_id, table_map)])
700 });
701
702 self.recovery_table_with_upstream_sinks(&mut database_info)
703 .await?;
704
705 assert!(database_info.len() <= 1);
706
707 let stream_actors = self.load_stream_actors(&database_info).await?;
708
709 let Some(info) = database_info
710 .into_iter()
711 .next()
712 .map(|(loaded_database_id, info)| {
713 assert_eq!(loaded_database_id, database_id);
714 info
715 })
716 else {
717 return Ok(None);
718 };
719
720 let (state_table_committed_epochs, state_table_log_epochs) = self
721 .hummock_manager
722 .on_current_version(|version| {
723 Self::resolve_hummock_version_epochs(
724 background_jobs
725 .keys()
726 .map(|job_id| (*job_id, &info[job_id])),
727 version,
728 )
729 })
730 .await?;
731
732 let mv_depended_subscriptions = self
733 .metadata_manager
734 .get_mv_depended_subscriptions(Some(database_id))
735 .await?;
736
737 let fragment_relations = self
738 .metadata_manager
739 .catalog_controller
740 .get_fragment_downstream_relations(
741 info.values()
742 .flatten()
743 .map(|(fragment_id, _)| *fragment_id as _)
744 .collect(),
745 )
746 .await?;
747
748 let mut source_splits = HashMap::new();
750 for (_, fragment) in info.values().flatten() {
751 for (actor_id, info) in &fragment.actors {
752 source_splits.insert(*actor_id, info.splits.clone());
753 }
754 }
755
756 let cdc_table_backfill_actors = Self::collect_cdc_table_backfill_actors(info.iter());
757
758 let cdc_table_ids = cdc_table_backfill_actors
759 .keys()
760 .cloned()
761 .collect::<Vec<_>>();
762 let cdc_table_snapshot_split_assignment = assign_cdc_table_snapshot_splits_pairs(
763 cdc_table_backfill_actors,
764 self.env.meta_store_ref(),
765 self.env.cdc_table_backfill_tracker.completed_job_ids(),
766 )
767 .await?;
768 let cdc_table_snapshot_split_assignment = if cdc_table_snapshot_split_assignment.is_empty()
769 {
770 CdcTableSnapshotSplitAssignmentWithGeneration::empty()
771 } else {
772 CdcTableSnapshotSplitAssignmentWithGeneration::new(
773 cdc_table_snapshot_split_assignment,
774 self.env
775 .cdc_table_backfill_tracker
776 .next_generation(cdc_table_ids.into_iter()),
777 )
778 };
779
780 self.refresh_manager
781 .remove_trackers_by_database(database_id);
782
783 Ok(Some(DatabaseRuntimeInfoSnapshot {
784 job_infos: info,
785 state_table_committed_epochs,
786 state_table_log_epochs,
787 mv_depended_subscriptions,
788 stream_actors,
789 fragment_relations,
790 source_splits,
791 background_jobs,
792 cdc_table_snapshot_split_assignment,
793 }))
794 }
795
796 async fn load_stream_actors(
797 &self,
798 all_info: &HashMap<DatabaseId, HashMap<JobId, HashMap<FragmentId, InflightFragmentInfo>>>,
799 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
800 let job_ids = all_info
801 .values()
802 .flat_map(|jobs| jobs.keys().copied())
803 .collect_vec();
804
805 let job_extra_info = self
806 .metadata_manager
807 .catalog_controller
808 .get_streaming_job_extra_info(job_ids)
809 .await?;
810
811 let mut stream_actors = HashMap::new();
812
813 for (job_id, streaming_info) in all_info.values().flatten() {
814 let extra_info = job_extra_info
815 .get(job_id)
816 .cloned()
817 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
818 let expr_context = extra_info.stream_context().to_expr_context();
819 let job_definition = extra_info.job_definition;
820 let config_override = extra_info.config_override;
821
822 for (fragment_id, fragment_infos) in streaming_info {
823 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
824 stream_actors.insert(
825 *actor_id,
826 StreamActor {
827 actor_id: *actor_id as _,
828 fragment_id: *fragment_id,
829 vnode_bitmap: vnode_bitmap.clone(),
830 mview_definition: job_definition.clone(),
831 expr_context: Some(expr_context.clone()),
832 config_override: config_override.clone(),
833 },
834 );
835 }
836 }
837 }
838 Ok(stream_actors)
839 }
840}