1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{IsSingleton, VnodeCount, WorkerSlotId};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37 Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38 FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42 StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48#[derive(Debug)]
50struct ActorBuilder {
51 actor_id: GlobalActorId,
53
54 fragment_id: GlobalFragmentId,
56
57 vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62 fn new(
63 actor_id: GlobalActorId,
64 fragment_id: GlobalFragmentId,
65 vnode_bitmap: Option<Bitmap>,
66 ) -> Self {
67 Self {
68 actor_id,
69 fragment_id,
70 vnode_bitmap,
71 }
72 }
73}
74
75impl FragmentActorBuilder {
76 fn rewrite(&self) -> MetaResult<StreamNode> {
82 self.rewrite_inner(&self.node, 0)
83 }
84
85 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86 match stream_node.get_node_body()? {
87 NodeBody::Exchange(exchange) => {
89 if depth == 0 {
92 bail!(
93 "there should be no ExchangeNode on the top of the plan node: {:#?}",
94 stream_node
95 )
96 }
97 assert!(!stream_node.get_fields().is_empty());
98 assert!(stream_node.input.is_empty());
99
100 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102 link_id: stream_node.get_operator_id(),
103 }];
104
105 let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107 Ok(StreamNode {
108 node_body: Some(NodeBody::Merge(Box::new({
109 #[expect(deprecated)]
110 MergeNode {
111 upstream_actor_id: vec![],
112 upstream_fragment_id,
113 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
114 fields: stream_node.get_fields().clone(),
115 }
116 }))),
117 identity: "MergeExecutor".to_owned(),
118 ..stream_node.clone()
119 })
120 }
121
122 NodeBody::StreamScan(stream_scan) => {
124 let input = stream_node.get_input();
125 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
126 assert!(input.is_empty());
129 return Ok(stream_node.clone());
130 }
131 assert_eq!(input.len(), 2);
132
133 let merge_node = &input[0];
134 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
135 let batch_plan_node = &input[1];
136 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
137
138 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
140 [&EdgeId::UpstreamExternal {
141 upstream_table_id: stream_scan.table_id.into(),
142 downstream_fragment_id: self.fragment_id,
143 }];
144
145 let is_shuffled_backfill = stream_scan.stream_scan_type
146 == StreamScanType::ArrangementBackfill as i32
147 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
148 if !is_shuffled_backfill {
149 assert!(upstream_no_shuffle_actor.is_some());
150 }
151
152 let upstream_dispatcher_type = if is_shuffled_backfill {
153 DispatcherType::Hash as _
156 } else {
157 DispatcherType::NoShuffle as _
158 };
159
160 let upstream_fragment_id = upstream_fragment_id.as_global_id();
161
162 let input = vec![
163 StreamNode {
165 node_body: Some(NodeBody::Merge(Box::new({
166 #[expect(deprecated)]
167 MergeNode {
168 upstream_actor_id: vec![],
169 upstream_fragment_id,
170 upstream_dispatcher_type,
171 fields: merge_node.fields.clone(),
172 }
173 }))),
174 ..merge_node.clone()
175 },
176 batch_plan_node.clone(),
177 ];
178
179 Ok(StreamNode {
180 input,
181 ..stream_node.clone()
182 })
183 }
184
185 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
189 let input = stream_node.get_input();
190 assert_eq!(input.len(), 1);
191
192 let merge_node = &input[0];
193 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
194
195 let upstream_source_id = match stream_node.get_node_body()? {
196 NodeBody::CdcFilter(node) => node.upstream_source_id,
197 NodeBody::SourceBackfill(node) => node.upstream_source_id,
198 _ => unreachable!(),
199 };
200
201 let (upstream_fragment_id, upstream_actors) = &self.upstreams
203 [&EdgeId::UpstreamExternal {
204 upstream_table_id: upstream_source_id.into(),
205 downstream_fragment_id: self.fragment_id,
206 }];
207
208 assert!(
209 upstream_actors.is_some(),
210 "Upstream Cdc Source should be singleton. \
211 SourceBackfill is NoShuffle 1-1 correspondence. \
212 So they both should have only one upstream actor."
213 );
214
215 let upstream_fragment_id = upstream_fragment_id.as_global_id();
216
217 let input = vec![
219 StreamNode {
221 node_body: Some(NodeBody::Merge(Box::new({
222 #[expect(deprecated)]
223 MergeNode {
224 upstream_actor_id: vec![],
225 upstream_fragment_id,
226 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
227 fields: merge_node.fields.clone(),
228 }
229 }))),
230 ..merge_node.clone()
231 },
232 ];
233 Ok(StreamNode {
234 input,
235 ..stream_node.clone()
236 })
237 }
238
239 _ => {
241 let mut new_stream_node = stream_node.clone();
242 for (input, new_input) in stream_node
243 .input
244 .iter()
245 .zip_eq_fast(&mut new_stream_node.input)
246 {
247 *new_input = self.rewrite_inner(input, depth + 1)?;
248 }
249 Ok(new_stream_node)
250 }
251 }
252 }
253}
254
255impl ActorBuilder {
256 fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
258 #[cfg(not(debug_assertions))]
260 let mview_definition = job.name();
261 #[cfg(debug_assertions)]
262 let mview_definition = job.definition();
263
264 Ok(StreamActor {
265 actor_id: self.actor_id.as_global_id(),
266 fragment_id: self.fragment_id.as_global_id(),
267 vnode_bitmap: self.vnode_bitmap,
268 mview_definition,
269 expr_context: Some(expr_context),
270 })
271 }
272}
273
274#[derive(Default)]
279struct UpstreamFragmentChange {
280 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
282}
283
284#[derive(Default)]
285struct DownstreamFragmentChange {
286 new_upstreams:
289 HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
290}
291
292impl UpstreamFragmentChange {
293 fn add_dispatcher(
295 &mut self,
296 downstream_fragment_id: GlobalFragmentId,
297 dispatch: DispatchStrategy,
298 ) {
299 self.new_downstreams
300 .try_insert(downstream_fragment_id, dispatch)
301 .unwrap();
302 }
303}
304
305impl DownstreamFragmentChange {
306 fn add_upstream(
308 &mut self,
309 edge_id: DownstreamExternalEdgeId,
310 new_upstream_fragment_id: GlobalFragmentId,
311 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
312 ) {
313 self.new_upstreams
314 .try_insert(
315 edge_id,
316 (new_upstream_fragment_id, no_shuffle_actor_mapping),
317 )
318 .unwrap();
319 }
320}
321
322type ActorLocations = BTreeMap<GlobalActorId, WorkerSlotId>;
324type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
326
327#[derive(Debug)]
328struct FragmentActorBuilder {
329 fragment_id: GlobalFragmentId,
330 node: StreamNode,
331 actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
332 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
333 upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
335}
336
337impl FragmentActorBuilder {
338 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
339 Self {
340 fragment_id,
341 node,
342 actor_builders: Default::default(),
343 downstreams: Default::default(),
344 upstreams: Default::default(),
345 }
346 }
347}
348
349#[derive(Default)]
356struct ActorGraphBuildStateInner {
357 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
359
360 building_locations: ActorLocations,
362
363 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
366
367 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
370
371 external_locations: ActorLocations,
373}
374
375struct FragmentLinkNode<'a> {
377 fragment_id: GlobalFragmentId,
378 actor_ids: &'a [GlobalActorId],
379}
380
381impl ActorGraphBuildStateInner {
382 fn add_actor(
386 &mut self,
387 (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
388 worker_slot_id: WorkerSlotId,
389 vnode_bitmap: Option<Bitmap>,
390 ) {
391 self.fragment_actor_builders
392 .get_mut(&fragment_id)
393 .expect("should added previously")
394 .actor_builders
395 .try_insert(
396 actor_id,
397 ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
398 )
399 .unwrap();
400
401 self.building_locations
402 .try_insert(actor_id, worker_slot_id)
403 .unwrap();
404 }
405
406 fn record_external_location(&mut self, actor_id: GlobalActorId, worker_slot_id: WorkerSlotId) {
408 self.external_locations
409 .try_insert(actor_id, worker_slot_id)
410 .unwrap();
411 }
412
413 fn add_dispatcher(
418 &mut self,
419 fragment_id: GlobalFragmentId,
420 downstream_fragment_id: GlobalFragmentId,
421 dispatch: DispatchStrategy,
422 ) {
423 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
424 builder
425 .downstreams
426 .try_insert(downstream_fragment_id, dispatch)
427 .unwrap();
428 } else {
429 self.upstream_fragment_changes
430 .entry(fragment_id)
431 .or_default()
432 .add_dispatcher(downstream_fragment_id, dispatch);
433 }
434 }
435
436 fn add_upstream(
441 &mut self,
442 fragment_id: GlobalFragmentId,
443 edge_id: EdgeId,
444 upstream_fragment_id: GlobalFragmentId,
445 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
446 ) {
447 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
448 builder
449 .upstreams
450 .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
451 .unwrap();
452 } else {
453 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
454 unreachable!("edge from internal to external must be `DownstreamExternal`")
455 };
456 self.downstream_fragment_changes
457 .entry(fragment_id)
458 .or_default()
459 .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
460 }
461 }
462
463 fn get_location(&self, actor_id: GlobalActorId) -> WorkerSlotId {
466 self.building_locations
467 .get(&actor_id)
468 .copied()
469 .or_else(|| self.external_locations.get(&actor_id).copied())
470 .unwrap()
471 }
472
473 fn add_link<'a>(
482 &mut self,
483 upstream: FragmentLinkNode<'a>,
484 downstream: FragmentLinkNode<'a>,
485 edge: &'a StreamFragmentEdge,
486 ) {
487 let dt = edge.dispatch_strategy.r#type();
488
489 match dt {
490 DispatcherType::NoShuffle => {
492 assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
493 let upstream_locations: HashMap<_, _> = upstream
494 .actor_ids
495 .iter()
496 .map(|id| (self.get_location(*id), *id))
497 .collect();
498 let downstream_locations: HashMap<_, _> = downstream
499 .actor_ids
500 .iter()
501 .map(|id| (self.get_location(*id), *id))
502 .collect();
503
504 self.add_dispatcher(
506 upstream.fragment_id,
507 downstream.fragment_id,
508 edge.dispatch_strategy.clone(),
509 );
510
511 self.add_upstream(
513 downstream.fragment_id,
514 edge.id,
515 upstream.fragment_id,
516 Some(
517 downstream_locations
518 .iter()
519 .map(|(location, downstream_actor_id)| {
520 let upstream_actor_id = upstream_locations.get(location).unwrap();
521 (*upstream_actor_id, *downstream_actor_id)
522 })
523 .collect(),
524 ),
525 );
526 }
527
528 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
530 self.add_dispatcher(
531 upstream.fragment_id,
532 downstream.fragment_id,
533 edge.dispatch_strategy.clone(),
534 );
535 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
536 }
537
538 DispatcherType::Unspecified => unreachable!(),
539 }
540 }
541}
542
543struct ActorGraphBuildState {
545 inner: ActorGraphBuildStateInner,
547
548 fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
550
551 next_local_id: u32,
553
554 actor_id_gen: GlobalActorIdGen,
556}
557
558impl ActorGraphBuildState {
559 fn new(actor_id_gen: GlobalActorIdGen) -> Self {
561 Self {
562 inner: Default::default(),
563 fragment_actors: Default::default(),
564 next_local_id: 0,
565 actor_id_gen,
566 }
567 }
568
569 fn next_actor_id(&mut self) -> GlobalActorId {
571 let local_id = self.next_local_id;
572 self.next_local_id += 1;
573
574 self.actor_id_gen.to_global_id(local_id)
575 }
576
577 fn finish(self) -> ActorGraphBuildStateInner {
579 assert_eq!(self.actor_id_gen.len(), self.next_local_id);
581
582 self.inner
583 }
584}
585
586pub struct ActorGraphBuildResult {
589 pub graph: BTreeMap<FragmentId, Fragment>,
591 pub downstream_fragment_relations: FragmentDownstreamRelation,
593
594 pub building_locations: Locations,
596
597 pub existing_locations: Locations,
599
600 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
602
603 pub replace_upstream: FragmentReplaceUpstream,
606
607 pub new_no_shuffle: FragmentNewNoShuffle,
611}
612
613#[derive(Debug)]
616pub struct ActorGraphBuilder {
617 distributions: HashMap<GlobalFragmentId, Distribution>,
619
620 existing_distributions: HashMap<GlobalFragmentId, Distribution>,
622
623 fragment_graph: CompleteStreamFragmentGraph,
625
626 cluster_info: StreamingClusterInfo,
628}
629
630impl ActorGraphBuilder {
631 pub fn new(
634 streaming_job_id: u32,
635 resource_group: String,
636 fragment_graph: CompleteStreamFragmentGraph,
637 cluster_info: StreamingClusterInfo,
638 default_parallelism: NonZeroUsize,
639 ) -> MetaResult<Self> {
640 let expected_vnode_count = fragment_graph.max_parallelism();
641 let existing_distributions = fragment_graph.existing_distribution();
642
643 let schedulable_workers =
644 cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
645
646 let scheduler = schedule::Scheduler::new(
647 streaming_job_id,
648 &schedulable_workers,
649 default_parallelism,
650 expected_vnode_count,
651 )?;
652
653 let distributions = scheduler.schedule(&fragment_graph)?;
654
655 let mut fragment_graph = fragment_graph;
657 for (id, fragment) in fragment_graph.building_fragments_mut() {
658 let fragment_vnode_count = distributions[id].vnode_count();
659 visit_tables(fragment, |table, _| {
660 let vnode_count = if table.is_singleton() {
663 if fragment_vnode_count > 1 {
664 tracing::info!(
665 table.name,
666 "found singleton table in hash-distributed fragment"
667 );
668 }
669 1
670 } else {
671 fragment_vnode_count
672 };
673 table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf();
674 })
675 }
676
677 Ok(Self {
678 distributions,
679 existing_distributions,
680 fragment_graph,
681 cluster_info,
682 })
683 }
684
685 fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
688 self.distributions
689 .get(&fragment_id)
690 .or_else(|| self.existing_distributions.get(&fragment_id))
691 .unwrap()
692 }
693
694 fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
696 let actor_locations = actor_locations
697 .into_iter()
698 .map(|(id, worker_slot_id)| (id.as_global_id(), worker_slot_id))
699 .collect();
700
701 let worker_locations = self
702 .cluster_info
703 .worker_nodes
704 .iter()
705 .map(|(id, node)| (*id as WorkerId, node.clone()))
706 .collect();
707
708 Locations {
709 actor_locations,
710 worker_locations,
711 }
712 }
713
714 pub fn generate_graph(
717 self,
718 env: &MetaSrvEnv,
719 job: &StreamingJob,
720 expr_context: ExprContext,
721 ) -> MetaResult<ActorGraphBuildResult> {
722 let actor_len = self
724 .distributions
725 .values()
726 .map(|d| d.parallelism())
727 .sum::<usize>() as u64;
728 let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
729
730 let ActorGraphBuildStateInner {
732 fragment_actor_builders,
733 building_locations,
734 downstream_fragment_changes,
735 upstream_fragment_changes,
736 external_locations,
737 } = self.build_actor_graph(id_gen)?;
738
739 for worker_slot_id in external_locations.values() {
740 if self
741 .cluster_info
742 .unschedulable_workers
743 .contains(&worker_slot_id.worker_id())
744 {
745 bail!(
746 "The worker {} where the associated upstream is located is unschedulable",
747 worker_slot_id.worker_id(),
748 );
749 }
750 }
751
752 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
753 let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
754 let graph = {
756 let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
757 HashMap::new();
758
759 for (fragment_id, builder) in fragment_actor_builders {
762 let global_fragment_id = fragment_id.as_global_id();
763 let node = builder.rewrite()?;
764 for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
765 if let Some(no_shuffle_upstream) = no_shuffle_upstream {
766 new_no_shuffle
767 .entry(upstream_fragment_id.as_global_id())
768 .or_default()
769 .try_insert(
770 global_fragment_id,
771 no_shuffle_upstream
772 .iter()
773 .map(|(upstream_actor_id, actor_id)| {
774 (upstream_actor_id.as_global_id(), actor_id.as_global_id())
775 })
776 .collect(),
777 )
778 .expect("non-duplicate");
779 }
780 }
781 downstream_fragment_relations
782 .try_insert(
783 global_fragment_id,
784 builder
785 .downstreams
786 .into_iter()
787 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
788 .collect(),
789 )
790 .expect("non-duplicate");
791 fragment_actors
792 .try_insert(
793 fragment_id,
794 (
795 node,
796 builder
797 .actor_builders
798 .into_values()
799 .map(|builder| builder.build(job, expr_context.clone()))
800 .try_collect()?,
801 ),
802 )
803 .expect("non-duplicate");
804 }
805
806 {
807 fragment_actors
808 .into_iter()
809 .map(|(fragment_id, (stream_node, actors))| {
810 let distribution = self.distributions[&fragment_id].clone();
811 let fragment = self.fragment_graph.seal_fragment(
812 fragment_id,
813 actors,
814 distribution,
815 stream_node,
816 );
817 let fragment_id = fragment_id.as_global_id();
818 (fragment_id, fragment)
819 })
820 .collect()
821 }
822 };
823
824 let building_locations = self.build_locations(building_locations);
826 let existing_locations = self.build_locations(external_locations);
827
828 let upstream_fragment_downstreams = upstream_fragment_changes
830 .into_iter()
831 .map(|(fragment_id, changes)| {
832 (
833 fragment_id.as_global_id(),
834 changes
835 .new_downstreams
836 .into_iter()
837 .map(|(downstream_fragment_id, new_dispatch)| {
838 (downstream_fragment_id.as_global_id(), new_dispatch).into()
839 })
840 .collect(),
841 )
842 })
843 .collect();
844
845 let replace_upstream = downstream_fragment_changes
847 .into_iter()
848 .map(|(fragment_id, changes)| {
849 let fragment_id = fragment_id.as_global_id();
850 let new_no_shuffle = &mut new_no_shuffle;
851 (
852 fragment_id,
853 changes
854 .new_upstreams
855 .into_iter()
856 .map(
857 move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
858 let upstream_fragment_id = upstream_fragment_id.as_global_id();
859 if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
860 && !upstream_new_no_shuffle.is_empty()
861 {
862 let no_shuffle_actors = new_no_shuffle
863 .entry(upstream_fragment_id)
864 .or_default()
865 .entry(fragment_id)
866 .or_default();
867 no_shuffle_actors.extend(
868 upstream_new_no_shuffle.into_iter().map(
869 |(upstream_actor_id, actor_id)| {
870 (
871 upstream_actor_id.as_global_id(),
872 actor_id.as_global_id(),
873 )
874 },
875 ),
876 );
877 }
878 let DownstreamExternalEdgeId {
879 original_upstream_fragment_id,
880 ..
881 } = edge_id;
882 (
883 original_upstream_fragment_id.as_global_id(),
884 upstream_fragment_id,
885 )
886 },
887 )
888 .collect(),
889 )
890 })
891 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
892 .collect();
893
894 Ok(ActorGraphBuildResult {
895 graph,
896 downstream_fragment_relations,
897 building_locations,
898 existing_locations,
899 upstream_fragment_downstreams,
900 replace_upstream,
901 new_no_shuffle,
902 })
903 }
904
905 fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
907 let mut state = ActorGraphBuildState::new(id_gen);
908
909 for fragment_id in self.fragment_graph.topo_order()? {
912 self.build_actor_graph_fragment(fragment_id, &mut state)?;
913 }
914
915 Ok(state.finish())
916 }
917
918 fn build_actor_graph_fragment(
920 &self,
921 fragment_id: GlobalFragmentId,
922 state: &mut ActorGraphBuildState,
923 ) -> MetaResult<()> {
924 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
925 let distribution = self.get_distribution(fragment_id);
926
927 let actor_ids = match current_fragment {
929 EitherFragment::Building(current_fragment) => {
931 let node = current_fragment.node.clone().unwrap();
932 state
933 .inner
934 .fragment_actor_builders
935 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
936 .expect("non-duplicate");
937 let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
938
939 distribution
940 .worker_slots()
941 .map(|worker_slot| {
942 let actor_id = state.next_actor_id();
943 let vnode_bitmap = bitmaps
944 .as_ref()
945 .map(|m: &HashMap<WorkerSlotId, Bitmap>| &m[&worker_slot])
946 .cloned();
947
948 state
949 .inner
950 .add_actor((fragment_id, actor_id), worker_slot, vnode_bitmap);
951
952 actor_id
953 })
954 .collect_vec()
955 }
956
957 EitherFragment::Existing(existing_fragment) => existing_fragment
959 .actors
960 .iter()
961 .map(|a| {
962 let actor_id = GlobalActorId::new(a.actor_id);
963 let worker_slot_id = match &distribution {
964 Distribution::Singleton(worker_slot_id) => *worker_slot_id,
965 Distribution::Hash(mapping) => mapping
966 .get_matched(a.vnode_bitmap.as_ref().unwrap())
967 .unwrap(),
968 };
969
970 state
971 .inner
972 .record_external_location(actor_id, worker_slot_id);
973
974 actor_id
975 })
976 .collect_vec(),
977 };
978
979 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
981 let downstream_actors = state
982 .fragment_actors
983 .get(&downstream_fragment_id)
984 .expect("downstream fragment not processed yet");
985
986 state.inner.add_link(
987 FragmentLinkNode {
988 fragment_id,
989 actor_ids: &actor_ids,
990 },
991 FragmentLinkNode {
992 fragment_id: downstream_fragment_id,
993 actor_ids: downstream_actors,
994 },
995 edge,
996 );
997 }
998
999 state
1001 .fragment_actors
1002 .try_insert(fragment_id, actor_ids)
1003 .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1004
1005 Ok(())
1006 }
1007}