1use std::cmp::{Ordering, max, min};
16use std::collections::hash_map::Entry;
17use std::collections::{BTreeMap, HashMap, HashSet};
18use std::num::NonZeroUsize;
19
20use anyhow::{Context, anyhow};
21use itertools::Itertools;
22use risingwave_common::bail;
23use risingwave_common::catalog::{DatabaseId, TableId};
24use risingwave_common::id::JobId;
25use risingwave_common::system_param::AdaptiveParallelismStrategy;
26use risingwave_common::system_param::reader::SystemParamsRead;
27use risingwave_common::util::stream_graph_visitor::visit_stream_node_cont;
28use risingwave_hummock_sdk::version::HummockVersion;
29use risingwave_meta_model::SinkId;
30use risingwave_pb::catalog::table::PbTableType;
31use risingwave_pb::stream_plan::stream_node::PbNodeBody;
32use thiserror_ext::AsReport;
33use tracing::{info, warn};
34
35use super::BarrierWorkerRuntimeInfoSnapshot;
36use crate::MetaResult;
37use crate::barrier::DatabaseRuntimeInfoSnapshot;
38use crate::barrier::context::GlobalBarrierWorkerContextImpl;
39use crate::controller::fragment::{InflightActorInfo, InflightFragmentInfo};
40use crate::controller::scale::{
41 FragmentRenderMap, LoadedFragmentContext, RenderedGraph, WorkerInfo, render_actor_assignments,
42};
43use crate::manager::ActiveStreamingWorkerNodes;
44use crate::model::{ActorId, FragmentId, StreamActor};
45use crate::rpc::ddl_controller::refill_upstream_sink_union_in_table;
46use crate::stream::cdc::reload_cdc_table_snapshot_splits;
47use crate::stream::{SourceChange, StreamFragmentGraph};
48
49impl GlobalBarrierWorkerContextImpl {
50 async fn clean_dirty_streaming_jobs(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
52 self.metadata_manager
53 .catalog_controller
54 .clean_dirty_subscription(database_id)
55 .await?;
56 let dirty_associated_source_ids = self
57 .metadata_manager
58 .catalog_controller
59 .clean_dirty_creating_jobs(database_id)
60 .await?;
61 self.metadata_manager
62 .reset_all_refresh_jobs_to_idle()
63 .await?;
64
65 self.source_manager
67 .apply_source_change(SourceChange::DropSource {
68 dropped_source_ids: dirty_associated_source_ids,
69 })
70 .await;
71
72 Ok(())
73 }
74
75 async fn reset_sink_coordinator(&self, database_id: Option<DatabaseId>) -> MetaResult<()> {
76 if let Some(database_id) = database_id {
77 let sink_ids = self
78 .metadata_manager
79 .catalog_controller
80 .list_sink_ids(Some(database_id))
81 .await?;
82 self.sink_manager.stop_sink_coordinator(sink_ids).await;
83 } else {
84 self.sink_manager.reset().await;
85 }
86 Ok(())
87 }
88
89 async fn abort_dirty_pending_sink_state(
90 &self,
91 database_id: Option<DatabaseId>,
92 ) -> MetaResult<()> {
93 let pending_sinks: HashSet<SinkId> = self
94 .metadata_manager
95 .catalog_controller
96 .list_all_pending_sinks(database_id)
97 .await?;
98
99 if pending_sinks.is_empty() {
100 return Ok(());
101 }
102
103 let sink_with_state_tables: HashMap<SinkId, Vec<TableId>> = self
104 .metadata_manager
105 .catalog_controller
106 .fetch_sink_with_state_table_ids(pending_sinks)
107 .await?;
108
109 let mut sink_committed_epoch: HashMap<SinkId, u64> = HashMap::new();
110
111 for (sink_id, table_ids) in sink_with_state_tables {
112 let Some(table_id) = table_ids.first() else {
113 return Err(anyhow!("no state table id in sink: {}", sink_id).into());
114 };
115
116 self.hummock_manager
117 .on_current_version(|version| -> MetaResult<()> {
118 if let Some(committed_epoch) = version.table_committed_epoch(*table_id) {
119 assert!(
120 sink_committed_epoch
121 .insert(sink_id, committed_epoch)
122 .is_none()
123 );
124 Ok(())
125 } else {
126 Err(anyhow!("cannot get committed epoch on table {}.", table_id).into())
127 }
128 })
129 .await?;
130 }
131
132 self.metadata_manager
133 .catalog_controller
134 .abort_pending_sink_epochs(sink_committed_epoch)
135 .await?;
136
137 Ok(())
138 }
139
140 async fn purge_state_table_from_hummock(
141 &self,
142 all_state_table_ids: &HashSet<TableId>,
143 ) -> MetaResult<()> {
144 self.hummock_manager.purge(all_state_table_ids).await?;
145 Ok(())
146 }
147
148 async fn list_background_job_progress(
149 &self,
150 database_id: Option<DatabaseId>,
151 ) -> MetaResult<HashMap<JobId, String>> {
152 let mgr = &self.metadata_manager;
153 let job_info = mgr
154 .catalog_controller
155 .list_background_creating_jobs(false, database_id)
156 .await?;
157
158 Ok(job_info
159 .into_iter()
160 .map(|(job_id, definition, _init_at)| (job_id, definition))
161 .collect())
162 }
163
164 async fn load_fragment_context(
166 &self,
167 database_id: Option<DatabaseId>,
168 ) -> MetaResult<LoadedFragmentContext> {
169 self.metadata_manager
170 .catalog_controller
171 .load_fragment_context(database_id)
172 .await
173 }
174
175 fn render_actor_assignments(
177 &self,
178 database_id: Option<DatabaseId>,
179 loaded: &LoadedFragmentContext,
180 worker_nodes: &ActiveStreamingWorkerNodes,
181 adaptive_parallelism_strategy: AdaptiveParallelismStrategy,
182 ) -> MetaResult<FragmentRenderMap> {
183 if loaded.is_empty() {
184 return Ok(HashMap::new());
185 }
186
187 let available_workers: BTreeMap<_, _> = worker_nodes
188 .current()
189 .values()
190 .filter(|worker| worker.is_streaming_schedulable())
191 .map(|worker| {
192 (
193 worker.id,
194 WorkerInfo {
195 parallelism: NonZeroUsize::new(worker.compute_node_parallelism()).unwrap(),
196 resource_group: worker.resource_group(),
197 },
198 )
199 })
200 .collect();
201
202 let RenderedGraph { fragments, .. } = render_actor_assignments(
203 self.metadata_manager
204 .catalog_controller
205 .env
206 .actor_id_generator(),
207 &available_workers,
208 adaptive_parallelism_strategy,
209 loaded,
210 )?;
211
212 if let Some(database_id) = database_id {
213 for loaded_database_id in fragments.keys() {
214 assert_eq!(*loaded_database_id, database_id);
215 }
216 }
217
218 Ok(fragments)
219 }
220
221 #[expect(clippy::type_complexity)]
222 fn resolve_hummock_version_epochs(
223 background_jobs: impl Iterator<Item = (JobId, &HashMap<FragmentId, InflightFragmentInfo>)>,
224 version: &HummockVersion,
225 ) -> MetaResult<(
226 HashMap<TableId, u64>,
227 HashMap<TableId, Vec<(Vec<u64>, u64)>>,
228 )> {
229 let table_committed_epoch: HashMap<_, _> = version
230 .state_table_info
231 .info()
232 .iter()
233 .map(|(table_id, info)| (*table_id, info.committed_epoch))
234 .collect();
235 let get_table_committed_epoch = |table_id| -> anyhow::Result<u64> {
236 Ok(*table_committed_epoch
237 .get(&table_id)
238 .ok_or_else(|| anyhow!("cannot get committed epoch on table {}.", table_id))?)
239 };
240 let mut min_downstream_committed_epochs = HashMap::new();
241 for (job_id, fragments) in background_jobs {
242 let job_committed_epoch = {
243 let mut table_id_iter =
244 InflightFragmentInfo::existing_table_ids(fragments.values());
245 let Some(first_table_id) = table_id_iter.next() else {
246 bail!("job {} has no state table", job_id);
247 };
248 let job_committed_epoch = get_table_committed_epoch(first_table_id)?;
249 for table_id in table_id_iter {
250 let table_committed_epoch = get_table_committed_epoch(table_id)?;
251 if job_committed_epoch != table_committed_epoch {
252 bail!(
253 "table {} has committed epoch {} different to other table {} with committed epoch {} in job {}",
254 first_table_id,
255 job_committed_epoch,
256 table_id,
257 table_committed_epoch,
258 job_id
259 );
260 }
261 }
262
263 job_committed_epoch
264 };
265 if let (Some(snapshot_backfill_info), _) =
266 StreamFragmentGraph::collect_snapshot_backfill_info_impl(
267 fragments
268 .values()
269 .map(|fragment| (&fragment.nodes, fragment.fragment_type_mask)),
270 )?
271 {
272 for (upstream_table, snapshot_epoch) in
273 snapshot_backfill_info.upstream_mv_table_id_to_backfill_epoch
274 {
275 let snapshot_epoch = snapshot_epoch.ok_or_else(|| {
276 anyhow!(
277 "recovered snapshot backfill job {} has not filled snapshot epoch to upstream {}",
278 job_id, upstream_table
279 )
280 })?;
281 let pinned_epoch = max(snapshot_epoch, job_committed_epoch);
282 match min_downstream_committed_epochs.entry(upstream_table) {
283 Entry::Occupied(entry) => {
284 let prev_min_epoch = entry.into_mut();
285 *prev_min_epoch = min(*prev_min_epoch, pinned_epoch);
286 }
287 Entry::Vacant(entry) => {
288 entry.insert(pinned_epoch);
289 }
290 }
291 }
292 }
293 }
294 let mut log_epochs = HashMap::new();
295 for (upstream_table_id, downstream_committed_epoch) in min_downstream_committed_epochs {
296 let upstream_committed_epoch = get_table_committed_epoch(upstream_table_id)?;
297 match upstream_committed_epoch.cmp(&downstream_committed_epoch) {
298 Ordering::Less => {
299 bail!(
300 "downstream epoch {} later than upstream epoch {} of table {}",
301 downstream_committed_epoch,
302 upstream_committed_epoch,
303 upstream_table_id
304 );
305 }
306 Ordering::Equal => {
307 continue;
308 }
309 Ordering::Greater => {
310 if let Some(table_change_log) = version.table_change_log.get(&upstream_table_id)
311 {
312 let epochs = table_change_log
313 .filter_epoch((downstream_committed_epoch, upstream_committed_epoch))
314 .map(|epoch_log| {
315 (
316 epoch_log.non_checkpoint_epochs.clone(),
317 epoch_log.checkpoint_epoch,
318 )
319 })
320 .collect_vec();
321 let first_epochs = epochs.first();
322 if let Some((_, first_checkpoint_epoch)) = &first_epochs
323 && *first_checkpoint_epoch == downstream_committed_epoch
324 {
325 } else {
326 bail!(
327 "resolved first log epoch {:?} on table {} not matched with downstream committed epoch {}",
328 epochs,
329 upstream_table_id,
330 downstream_committed_epoch
331 );
332 }
333 log_epochs
334 .try_insert(upstream_table_id, epochs)
335 .expect("non-duplicated");
336 } else {
337 bail!(
338 "upstream table {} on epoch {} has lagged downstream on epoch {} but no table change log",
339 upstream_table_id,
340 upstream_committed_epoch,
341 downstream_committed_epoch
342 );
343 }
344 }
345 }
346 }
347 Ok((table_committed_epoch, log_epochs))
348 }
349
350 async fn recovery_table_with_upstream_sinks(
355 &self,
356 inflight_jobs: &mut FragmentRenderMap,
357 ) -> MetaResult<()> {
358 let mut jobs = inflight_jobs.values_mut().try_fold(
359 HashMap::new(),
360 |mut acc, table_map| -> MetaResult<_> {
361 for (job_id, job) in table_map {
362 if acc.insert(*job_id, job).is_some() {
363 return Err(anyhow::anyhow!("Duplicate job id found: {}", job_id).into());
364 }
365 }
366 Ok(acc)
367 },
368 )?;
369 let tables = self
371 .metadata_manager
372 .catalog_controller
373 .get_user_created_table_by_ids(jobs.keys().copied())
374 .await?;
375 for table in tables {
376 assert_eq!(table.table_type(), PbTableType::Table);
377 let fragment_infos = jobs.get_mut(&table.id.as_job_id()).unwrap();
378 let mut target_fragment_id = None;
379 for fragment in fragment_infos.values() {
380 let mut is_target_fragment = false;
381 visit_stream_node_cont(&fragment.nodes, |node| {
382 if let Some(PbNodeBody::UpstreamSinkUnion(_)) = node.node_body {
383 is_target_fragment = true;
384 false
385 } else {
386 true
387 }
388 });
389 if is_target_fragment {
390 target_fragment_id = Some(fragment.fragment_id);
391 break;
392 }
393 }
394 let Some(target_fragment_id) = target_fragment_id else {
395 tracing::debug!(
396 "The table {} created by old versions has not yet been migrated, so sinks cannot be created or dropped on this table.",
397 table.id
398 );
399 continue;
400 };
401 let target_fragment = fragment_infos.get_mut(&target_fragment_id).unwrap();
402 let upstream_infos = self
403 .metadata_manager
404 .catalog_controller
405 .get_all_upstream_sink_infos(&table, target_fragment_id as _)
406 .await?;
407 refill_upstream_sink_union_in_table(&mut target_fragment.nodes, &upstream_infos);
408 }
409
410 Ok(())
411 }
412
413 pub(super) async fn reload_runtime_info_impl(
414 &self,
415 ) -> MetaResult<BarrierWorkerRuntimeInfoSnapshot> {
416 {
417 {
418 {
419 self.clean_dirty_streaming_jobs(None)
420 .await
421 .context("clean dirty streaming jobs")?;
422
423 self.reset_sink_coordinator(None)
424 .await
425 .context("reset sink coordinator")?;
426 self.abort_dirty_pending_sink_state(None)
427 .await
428 .context("abort dirty pending sink state")?;
429
430 tracing::info!("recovering background job progress");
432 let background_jobs = self
433 .list_background_job_progress(None)
434 .await
435 .context("recover background job progress should not fail")?;
436
437 tracing::info!("recovered background job progress");
438
439 let _ = self.scheduled_barriers.pre_apply_drop_cancel(None);
441 self.metadata_manager
442 .catalog_controller
443 .cleanup_dropped_tables()
444 .await;
445
446 let adaptive_parallelism_strategy = {
447 let system_params_reader = self
448 .metadata_manager
449 .catalog_controller
450 .env
451 .system_params_reader()
452 .await;
453 system_params_reader.adaptive_parallelism_strategy()
454 };
455
456 let active_streaming_nodes =
457 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone())
458 .await?;
459
460 let background_streaming_jobs = background_jobs.keys().cloned().collect_vec();
461
462 tracing::info!(
463 "background streaming jobs: {:?} total {}",
464 background_streaming_jobs,
465 background_streaming_jobs.len()
466 );
467
468 let unreschedulable_jobs = {
469 let mut unreschedulable_jobs = HashSet::new();
470
471 for job_id in background_streaming_jobs {
472 let scan_types = self
473 .metadata_manager
474 .get_job_backfill_scan_types(job_id)
475 .await?;
476
477 if scan_types
478 .values()
479 .any(|scan_type| !scan_type.is_reschedulable())
480 {
481 unreschedulable_jobs.insert(job_id);
482 }
483 }
484
485 unreschedulable_jobs
486 };
487
488 if !unreschedulable_jobs.is_empty() {
489 tracing::info!(
490 "unreschedulable background jobs: {:?}",
491 unreschedulable_jobs
492 );
493 }
494
495 let mut info = if unreschedulable_jobs.is_empty() {
499 info!("trigger offline scaling");
500 let loaded = self.load_fragment_context(None).await.inspect_err(|err| {
501 warn!(error = %err.as_report(), "load fragment context failed");
502 })?;
503 self.render_actor_assignments(
504 None,
505 &loaded,
506 &active_streaming_nodes,
507 adaptive_parallelism_strategy,
508 )
509 .inspect_err(|err| {
510 warn!(error = %err.as_report(), "render actor assignments failed");
511 })?
512 } else {
513 bail!(
514 "Recovery for unreschedulable background jobs is not yet implemented. \
515 This path is triggered when the following jobs have at least one scan type that is not reschedulable: {:?}.",
516 unreschedulable_jobs
517 );
518 };
519
520 let dropped_table_ids = self.scheduled_barriers.pre_apply_drop_cancel(None);
521 if !dropped_table_ids.is_empty() {
522 self.metadata_manager
523 .catalog_controller
524 .complete_dropped_tables(dropped_table_ids)
525 .await;
526 let loaded = self.load_fragment_context(None).await.inspect_err(|err| {
527 warn!(error = %err.as_report(), "load fragment context failed");
528 })?;
529 info = self
530 .render_actor_assignments(
531 None,
532 &loaded,
533 &active_streaming_nodes,
534 adaptive_parallelism_strategy,
535 )
536 .inspect_err(|err| {
537 warn!(error = %err.as_report(), "render actor assignments failed");
538 })?
539 }
540
541 self.recovery_table_with_upstream_sinks(&mut info).await?;
542
543 let info = info;
544
545 self.purge_state_table_from_hummock(
546 &info
547 .values()
548 .flatten()
549 .flat_map(|(_, fragments)| {
550 InflightFragmentInfo::existing_table_ids(fragments.values())
551 })
552 .collect(),
553 )
554 .await
555 .context("purge state table from hummock")?;
556
557 let (state_table_committed_epochs, state_table_log_epochs) = self
558 .hummock_manager
559 .on_current_version(|version| {
560 Self::resolve_hummock_version_epochs(
561 info.values().flat_map(|jobs| {
562 jobs.iter().filter_map(|(job_id, job)| {
563 background_jobs
564 .contains_key(job_id)
565 .then_some((*job_id, job))
566 })
567 }),
568 version,
569 )
570 })
571 .await?;
572
573 let mv_depended_subscriptions = self
574 .metadata_manager
575 .get_mv_depended_subscriptions(None)
576 .await?;
577
578 let stream_actors = self.load_stream_actors(&info).await?;
579
580 let fragment_relations = self
581 .metadata_manager
582 .catalog_controller
583 .get_fragment_downstream_relations(
584 info.values()
585 .flatten()
586 .flat_map(|(_, job)| job.keys())
587 .map(|fragment_id| *fragment_id as _)
588 .collect(),
589 )
590 .await?;
591
592 let background_jobs = {
593 let mut background_jobs = self
594 .list_background_job_progress(None)
595 .await
596 .context("recover background job progress should not fail")?;
597 info.values()
598 .flatten()
599 .filter_map(|(job_id, _)| {
600 background_jobs
601 .remove(job_id)
602 .map(|definition| (*job_id, definition))
603 })
604 .collect()
605 };
606
607 let database_infos = self
608 .metadata_manager
609 .catalog_controller
610 .list_databases()
611 .await?;
612
613 let mut source_splits = HashMap::new();
615 for (_, fragment_infos) in info.values().flatten() {
616 for fragment in fragment_infos.values() {
617 for (actor_id, info) in &fragment.actors {
618 source_splits.insert(*actor_id, info.splits.clone());
619 }
620 }
621 }
622
623 let cdc_table_snapshot_splits =
624 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, None)
625 .await?;
626
627 Ok(BarrierWorkerRuntimeInfoSnapshot {
628 active_streaming_nodes,
629 database_job_infos: info,
630 state_table_committed_epochs,
631 state_table_log_epochs,
632 mv_depended_subscriptions,
633 stream_actors,
634 fragment_relations,
635 source_splits,
636 background_jobs,
637 hummock_version_stats: self.hummock_manager.get_version_stats().await,
638 database_infos,
639 cdc_table_snapshot_splits,
640 })
641 }
642 }
643 }
644 }
645
646 pub(super) async fn reload_database_runtime_info_impl(
647 &self,
648 database_id: DatabaseId,
649 ) -> MetaResult<Option<DatabaseRuntimeInfoSnapshot>> {
650 self.clean_dirty_streaming_jobs(Some(database_id))
651 .await
652 .context("clean dirty streaming jobs")?;
653
654 self.reset_sink_coordinator(Some(database_id))
655 .await
656 .context("reset sink coordinator")?;
657 self.abort_dirty_pending_sink_state(Some(database_id))
658 .await
659 .context("abort dirty pending sink state")?;
660
661 tracing::info!(
663 ?database_id,
664 "recovering background job progress of database"
665 );
666
667 let background_jobs = self
668 .list_background_job_progress(Some(database_id))
669 .await
670 .context("recover background job progress of database should not fail")?;
671 tracing::info!(?database_id, "recovered background job progress");
672
673 let dropped_table_ids = self
675 .scheduled_barriers
676 .pre_apply_drop_cancel(Some(database_id));
677 self.metadata_manager
678 .catalog_controller
679 .complete_dropped_tables(dropped_table_ids)
680 .await;
681
682 let adaptive_parallelism_strategy = {
683 let system_params_reader = self
684 .metadata_manager
685 .catalog_controller
686 .env
687 .system_params_reader()
688 .await;
689 system_params_reader.adaptive_parallelism_strategy()
690 };
691
692 let active_streaming_nodes =
693 ActiveStreamingWorkerNodes::new_snapshot(self.metadata_manager.clone()).await?;
694
695 let loaded = self
696 .load_fragment_context(Some(database_id))
697 .await
698 .inspect_err(|err| {
699 warn!(error = %err.as_report(), "load fragment context failed");
700 })?;
701
702 let mut all_info = self
703 .render_actor_assignments(
704 Some(database_id),
705 &loaded,
706 &active_streaming_nodes,
707 adaptive_parallelism_strategy,
708 )
709 .inspect_err(|err| {
710 warn!(error = %err.as_report(), "render actor assignments failed");
711 })?;
712
713 let mut database_info = all_info
714 .remove(&database_id)
715 .map_or_else(HashMap::new, |table_map| {
716 HashMap::from([(database_id, table_map)])
717 });
718
719 self.recovery_table_with_upstream_sinks(&mut database_info)
720 .await?;
721
722 assert!(database_info.len() <= 1);
723
724 let stream_actors = self.load_stream_actors(&database_info).await?;
725
726 let Some(info) = database_info
727 .into_iter()
728 .next()
729 .map(|(loaded_database_id, info)| {
730 assert_eq!(loaded_database_id, database_id);
731 info
732 })
733 else {
734 return Ok(None);
735 };
736
737 let (state_table_committed_epochs, state_table_log_epochs) = self
738 .hummock_manager
739 .on_current_version(|version| {
740 Self::resolve_hummock_version_epochs(
741 background_jobs
742 .keys()
743 .map(|job_id| (*job_id, &info[job_id])),
744 version,
745 )
746 })
747 .await?;
748
749 let mv_depended_subscriptions = self
750 .metadata_manager
751 .get_mv_depended_subscriptions(Some(database_id))
752 .await?;
753
754 let fragment_relations = self
755 .metadata_manager
756 .catalog_controller
757 .get_fragment_downstream_relations(
758 info.values()
759 .flatten()
760 .map(|(fragment_id, _)| *fragment_id as _)
761 .collect(),
762 )
763 .await?;
764
765 let mut source_splits = HashMap::new();
767 for (_, fragment) in info.values().flatten() {
768 for (actor_id, info) in &fragment.actors {
769 source_splits.insert(*actor_id, info.splits.clone());
770 }
771 }
772
773 let cdc_table_snapshot_splits =
774 reload_cdc_table_snapshot_splits(&self.env.meta_store_ref().conn, Some(database_id))
775 .await?;
776
777 self.refresh_manager
778 .remove_trackers_by_database(database_id);
779
780 Ok(Some(DatabaseRuntimeInfoSnapshot {
781 job_infos: info,
782 state_table_committed_epochs,
783 state_table_log_epochs,
784 mv_depended_subscriptions,
785 stream_actors,
786 fragment_relations,
787 source_splits,
788 background_jobs,
789 cdc_table_snapshot_splits,
790 }))
791 }
792
793 async fn load_stream_actors(
794 &self,
795 all_info: &FragmentRenderMap,
796 ) -> MetaResult<HashMap<ActorId, StreamActor>> {
797 let job_ids = all_info
798 .values()
799 .flat_map(|jobs| jobs.keys().copied())
800 .collect_vec();
801
802 let job_extra_info = self
803 .metadata_manager
804 .catalog_controller
805 .get_streaming_job_extra_info(job_ids)
806 .await?;
807
808 let mut stream_actors = HashMap::new();
809
810 for (job_id, streaming_info) in all_info.values().flatten() {
811 let extra_info = job_extra_info
812 .get(job_id)
813 .cloned()
814 .ok_or_else(|| anyhow!("no streaming job info for {}", job_id))?;
815 let expr_context = extra_info.stream_context().to_expr_context();
816 let job_definition = extra_info.job_definition;
817 let config_override = extra_info.config_override;
818
819 for (fragment_id, fragment_infos) in streaming_info {
820 for (actor_id, InflightActorInfo { vnode_bitmap, .. }) in &fragment_infos.actors {
821 stream_actors.insert(
822 *actor_id,
823 StreamActor {
824 actor_id: *actor_id as _,
825 fragment_id: *fragment_id,
826 vnode_bitmap: vnode_bitmap.clone(),
827 mview_definition: job_definition.clone(),
828 expr_context: Some(expr_context.clone()),
829 config_override: config_override.clone(),
830 },
831 );
832 }
833 }
834 }
835 Ok(stream_actors)
836 }
837}