1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37 Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38 FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42 StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48#[derive(Debug)]
50struct ActorBuilder {
51 actor_id: GlobalActorId,
53
54 fragment_id: GlobalFragmentId,
56
57 vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62 fn new(
63 actor_id: GlobalActorId,
64 fragment_id: GlobalFragmentId,
65 vnode_bitmap: Option<Bitmap>,
66 ) -> Self {
67 Self {
68 actor_id,
69 fragment_id,
70 vnode_bitmap,
71 }
72 }
73}
74
75impl FragmentActorBuilder {
76 fn rewrite(&self) -> MetaResult<StreamNode> {
82 self.rewrite_inner(&self.node, 0)
83 }
84
85 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86 match stream_node.get_node_body()? {
87 NodeBody::Exchange(exchange) => {
89 if depth == 0 {
92 bail!(
93 "there should be no ExchangeNode on the top of the plan node: {:#?}",
94 stream_node
95 )
96 }
97 assert!(!stream_node.get_fields().is_empty());
98 assert!(stream_node.input.is_empty());
99
100 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102 link_id: stream_node.get_operator_id(),
103 }];
104
105 let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107 Ok(StreamNode {
108 node_body: Some(NodeBody::Merge(Box::new({
109 #[expect(deprecated)]
110 MergeNode {
111 upstream_actor_id: vec![],
112 upstream_fragment_id,
113 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
114 fields: stream_node.get_fields().clone(),
115 }
116 }))),
117 identity: "MergeExecutor".to_owned(),
118 ..stream_node.clone()
119 })
120 }
121
122 NodeBody::StreamScan(stream_scan) => {
124 let input = stream_node.get_input();
125 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
126 assert!(input.is_empty());
129 return Ok(stream_node.clone());
130 }
131 assert_eq!(input.len(), 2);
132
133 let merge_node = &input[0];
134 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
135 let batch_plan_node = &input[1];
136 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
137
138 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
140 [&EdgeId::UpstreamExternal {
141 upstream_table_id: stream_scan.table_id.into(),
142 downstream_fragment_id: self.fragment_id,
143 }];
144
145 let is_shuffled_backfill = stream_scan.stream_scan_type
146 == StreamScanType::ArrangementBackfill as i32
147 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
148 if !is_shuffled_backfill {
149 assert!(upstream_no_shuffle_actor.is_some());
150 }
151
152 let upstream_dispatcher_type = if is_shuffled_backfill {
153 DispatcherType::Hash as _
156 } else {
157 DispatcherType::NoShuffle as _
158 };
159
160 let upstream_fragment_id = upstream_fragment_id.as_global_id();
161
162 let input = vec![
163 StreamNode {
165 node_body: Some(NodeBody::Merge(Box::new({
166 #[expect(deprecated)]
167 MergeNode {
168 upstream_actor_id: vec![],
169 upstream_fragment_id,
170 upstream_dispatcher_type,
171 fields: merge_node.fields.clone(),
172 }
173 }))),
174 ..merge_node.clone()
175 },
176 batch_plan_node.clone(),
177 ];
178
179 Ok(StreamNode {
180 input,
181 ..stream_node.clone()
182 })
183 }
184
185 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
189 let input = stream_node.get_input();
190 assert_eq!(input.len(), 1);
191
192 let merge_node = &input[0];
193 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
194
195 let upstream_source_id = match stream_node.get_node_body()? {
196 NodeBody::CdcFilter(node) => node.upstream_source_id,
197 NodeBody::SourceBackfill(node) => node.upstream_source_id,
198 _ => unreachable!(),
199 };
200
201 let (upstream_fragment_id, upstream_actors) = &self.upstreams
203 [&EdgeId::UpstreamExternal {
204 upstream_table_id: upstream_source_id.into(),
205 downstream_fragment_id: self.fragment_id,
206 }];
207
208 assert!(
209 upstream_actors.is_some(),
210 "Upstream Cdc Source should be singleton. \
211 SourceBackfill is NoShuffle 1-1 correspondence. \
212 So they both should have only one upstream actor."
213 );
214
215 let upstream_fragment_id = upstream_fragment_id.as_global_id();
216
217 let input = vec![
219 StreamNode {
221 node_body: Some(NodeBody::Merge(Box::new({
222 #[expect(deprecated)]
223 MergeNode {
224 upstream_actor_id: vec![],
225 upstream_fragment_id,
226 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
227 fields: merge_node.fields.clone(),
228 }
229 }))),
230 ..merge_node.clone()
231 },
232 ];
233 Ok(StreamNode {
234 input,
235 ..stream_node.clone()
236 })
237 }
238
239 _ => {
241 let mut new_stream_node = stream_node.clone();
242 for (input, new_input) in stream_node
243 .input
244 .iter()
245 .zip_eq_fast(&mut new_stream_node.input)
246 {
247 *new_input = self.rewrite_inner(input, depth + 1)?;
248 }
249 Ok(new_stream_node)
250 }
251 }
252 }
253}
254
255impl ActorBuilder {
256 fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
258 #[cfg(not(debug_assertions))]
260 let mview_definition = job.name();
261 #[cfg(debug_assertions)]
262 let mview_definition = job.definition();
263
264 Ok(StreamActor {
265 actor_id: self.actor_id.as_global_id(),
266 fragment_id: self.fragment_id.as_global_id(),
267 vnode_bitmap: self.vnode_bitmap,
268 mview_definition,
269 expr_context: Some(expr_context),
270 })
271 }
272}
273
274#[derive(Default)]
279struct UpstreamFragmentChange {
280 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
282}
283
284#[derive(Default)]
285struct DownstreamFragmentChange {
286 new_upstreams:
289 HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
290}
291
292impl UpstreamFragmentChange {
293 fn add_dispatcher(
295 &mut self,
296 downstream_fragment_id: GlobalFragmentId,
297 dispatch: DispatchStrategy,
298 ) {
299 self.new_downstreams
300 .try_insert(downstream_fragment_id, dispatch)
301 .unwrap();
302 }
303}
304
305impl DownstreamFragmentChange {
306 fn add_upstream(
308 &mut self,
309 edge_id: DownstreamExternalEdgeId,
310 new_upstream_fragment_id: GlobalFragmentId,
311 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
312 ) {
313 self.new_upstreams
314 .try_insert(
315 edge_id,
316 (new_upstream_fragment_id, no_shuffle_actor_mapping),
317 )
318 .unwrap();
319 }
320}
321
322type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
324type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
326
327#[derive(Debug)]
328struct FragmentActorBuilder {
329 fragment_id: GlobalFragmentId,
330 node: StreamNode,
331 actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
332 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
333 upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
335}
336
337impl FragmentActorBuilder {
338 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
339 Self {
340 fragment_id,
341 node,
342 actor_builders: Default::default(),
343 downstreams: Default::default(),
344 upstreams: Default::default(),
345 }
346 }
347}
348
349#[derive(Default)]
356struct ActorGraphBuildStateInner {
357 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
359
360 building_locations: ActorLocations,
362
363 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
366
367 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
370
371 external_locations: ActorLocations,
373}
374
375struct FragmentLinkNode<'a> {
377 fragment_id: GlobalFragmentId,
378 actor_ids: &'a [GlobalActorId],
379}
380
381impl ActorGraphBuildStateInner {
382 fn add_actor(
386 &mut self,
387 (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
388 actor_alignment_id: ActorAlignmentId,
389 vnode_bitmap: Option<Bitmap>,
390 ) {
391 self.fragment_actor_builders
392 .get_mut(&fragment_id)
393 .expect("should added previously")
394 .actor_builders
395 .try_insert(
396 actor_id,
397 ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
398 )
399 .unwrap();
400
401 self.building_locations
402 .try_insert(actor_id, actor_alignment_id)
403 .unwrap();
404 }
405
406 fn record_external_location(
408 &mut self,
409 actor_id: GlobalActorId,
410 actor_alignment_id: ActorAlignmentId,
411 ) {
412 self.external_locations
413 .try_insert(actor_id, actor_alignment_id)
414 .unwrap();
415 }
416
417 fn add_dispatcher(
422 &mut self,
423 fragment_id: GlobalFragmentId,
424 downstream_fragment_id: GlobalFragmentId,
425 dispatch: DispatchStrategy,
426 ) {
427 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
428 builder
429 .downstreams
430 .try_insert(downstream_fragment_id, dispatch)
431 .unwrap();
432 } else {
433 self.upstream_fragment_changes
434 .entry(fragment_id)
435 .or_default()
436 .add_dispatcher(downstream_fragment_id, dispatch);
437 }
438 }
439
440 fn add_upstream(
445 &mut self,
446 fragment_id: GlobalFragmentId,
447 edge_id: EdgeId,
448 upstream_fragment_id: GlobalFragmentId,
449 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
450 ) {
451 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
452 builder
453 .upstreams
454 .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
455 .unwrap();
456 } else {
457 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
458 unreachable!("edge from internal to external must be `DownstreamExternal`")
459 };
460 self.downstream_fragment_changes
461 .entry(fragment_id)
462 .or_default()
463 .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
464 }
465 }
466
467 fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
470 self.building_locations
471 .get(&actor_id)
472 .copied()
473 .or_else(|| self.external_locations.get(&actor_id).copied())
474 .unwrap()
475 }
476
477 fn add_link<'a>(
486 &mut self,
487 upstream: FragmentLinkNode<'a>,
488 downstream: FragmentLinkNode<'a>,
489 edge: &'a StreamFragmentEdge,
490 ) {
491 let dt = edge.dispatch_strategy.r#type();
492
493 match dt {
494 DispatcherType::NoShuffle => {
496 assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
497 let upstream_locations: HashMap<_, _> = upstream
498 .actor_ids
499 .iter()
500 .map(|id| (self.get_location(*id), *id))
501 .collect();
502 let downstream_locations: HashMap<_, _> = downstream
503 .actor_ids
504 .iter()
505 .map(|id| (self.get_location(*id), *id))
506 .collect();
507
508 self.add_dispatcher(
510 upstream.fragment_id,
511 downstream.fragment_id,
512 edge.dispatch_strategy.clone(),
513 );
514
515 self.add_upstream(
517 downstream.fragment_id,
518 edge.id,
519 upstream.fragment_id,
520 Some(
521 downstream_locations
522 .iter()
523 .map(|(location, downstream_actor_id)| {
524 let upstream_actor_id = upstream_locations.get(location).unwrap();
525 (*upstream_actor_id, *downstream_actor_id)
526 })
527 .collect(),
528 ),
529 );
530 }
531
532 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
534 self.add_dispatcher(
535 upstream.fragment_id,
536 downstream.fragment_id,
537 edge.dispatch_strategy.clone(),
538 );
539 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
540 }
541
542 DispatcherType::Unspecified => unreachable!(),
543 }
544 }
545}
546
547struct ActorGraphBuildState {
549 inner: ActorGraphBuildStateInner,
551
552 fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
554
555 next_local_id: u32,
557
558 actor_id_gen: GlobalActorIdGen,
560}
561
562impl ActorGraphBuildState {
563 fn new(actor_id_gen: GlobalActorIdGen) -> Self {
565 Self {
566 inner: Default::default(),
567 fragment_actors: Default::default(),
568 next_local_id: 0,
569 actor_id_gen,
570 }
571 }
572
573 fn next_actor_id(&mut self) -> GlobalActorId {
575 let local_id = self.next_local_id;
576 self.next_local_id += 1;
577
578 self.actor_id_gen.to_global_id(local_id)
579 }
580
581 fn finish(self) -> ActorGraphBuildStateInner {
583 assert_eq!(self.actor_id_gen.len(), self.next_local_id);
585
586 self.inner
587 }
588}
589
590pub struct ActorGraphBuildResult {
593 pub graph: BTreeMap<FragmentId, Fragment>,
595 pub downstream_fragment_relations: FragmentDownstreamRelation,
597
598 pub building_locations: Locations,
600
601 pub existing_locations: Locations,
603
604 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
606
607 pub replace_upstream: FragmentReplaceUpstream,
610
611 pub new_no_shuffle: FragmentNewNoShuffle,
615}
616
617#[derive(Debug)]
620pub struct ActorGraphBuilder {
621 distributions: HashMap<GlobalFragmentId, Distribution>,
623
624 existing_distributions: HashMap<GlobalFragmentId, Distribution>,
626
627 fragment_graph: CompleteStreamFragmentGraph,
629
630 cluster_info: StreamingClusterInfo,
632}
633
634impl ActorGraphBuilder {
635 pub fn new(
638 streaming_job_id: u32,
639 resource_group: String,
640 fragment_graph: CompleteStreamFragmentGraph,
641 cluster_info: StreamingClusterInfo,
642 default_parallelism: NonZeroUsize,
643 ) -> MetaResult<Self> {
644 let expected_vnode_count = fragment_graph.max_parallelism();
645 let existing_distributions = fragment_graph.existing_distribution();
646
647 let schedulable_workers =
648 cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
649
650 let scheduler = schedule::Scheduler::new(
651 streaming_job_id,
652 &schedulable_workers,
653 default_parallelism,
654 expected_vnode_count,
655 )?;
656
657 let distributions = scheduler.schedule(&fragment_graph)?;
658
659 let mut fragment_graph = fragment_graph;
661 for (id, fragment) in fragment_graph.building_fragments_mut() {
662 let fragment_vnode_count = distributions[id].vnode_count();
663 visit_tables(fragment, |table, _| {
664 let vnode_count = if table.is_singleton() {
667 if fragment_vnode_count > 1 {
668 tracing::info!(
669 table.name,
670 "found singleton table in hash-distributed fragment"
671 );
672 }
673 1
674 } else {
675 fragment_vnode_count
676 };
677 table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf();
678 })
679 }
680
681 Ok(Self {
682 distributions,
683 existing_distributions,
684 fragment_graph,
685 cluster_info,
686 })
687 }
688
689 fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
692 self.distributions
693 .get(&fragment_id)
694 .or_else(|| self.existing_distributions.get(&fragment_id))
695 .unwrap()
696 }
697
698 fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
700 let actor_locations = actor_locations
701 .into_iter()
702 .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
703 .collect();
704
705 let worker_locations = self
706 .cluster_info
707 .worker_nodes
708 .iter()
709 .map(|(id, node)| (*id as WorkerId, node.clone()))
710 .collect();
711
712 Locations {
713 actor_locations,
714 worker_locations,
715 }
716 }
717
718 pub fn generate_graph(
721 self,
722 env: &MetaSrvEnv,
723 job: &StreamingJob,
724 expr_context: ExprContext,
725 ) -> MetaResult<ActorGraphBuildResult> {
726 let actor_len = self
728 .distributions
729 .values()
730 .map(|d| d.parallelism())
731 .sum::<usize>() as u64;
732 let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
733
734 let ActorGraphBuildStateInner {
736 fragment_actor_builders,
737 building_locations,
738 downstream_fragment_changes,
739 upstream_fragment_changes,
740 external_locations,
741 } = self.build_actor_graph(id_gen)?;
742
743 for alignment_id in external_locations.values() {
744 if self
745 .cluster_info
746 .unschedulable_workers
747 .contains(&alignment_id.worker_id())
748 {
749 bail!(
750 "The worker {} where the associated upstream is located is unschedulable",
751 alignment_id.worker_id(),
752 );
753 }
754 }
755
756 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
757 let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
758 let graph = {
760 let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
761 HashMap::new();
762
763 for (fragment_id, builder) in fragment_actor_builders {
766 let global_fragment_id = fragment_id.as_global_id();
767 let node = builder.rewrite()?;
768 for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
769 if let Some(no_shuffle_upstream) = no_shuffle_upstream {
770 new_no_shuffle
771 .entry(upstream_fragment_id.as_global_id())
772 .or_default()
773 .try_insert(
774 global_fragment_id,
775 no_shuffle_upstream
776 .iter()
777 .map(|(upstream_actor_id, actor_id)| {
778 (upstream_actor_id.as_global_id(), actor_id.as_global_id())
779 })
780 .collect(),
781 )
782 .expect("non-duplicate");
783 }
784 }
785 downstream_fragment_relations
786 .try_insert(
787 global_fragment_id,
788 builder
789 .downstreams
790 .into_iter()
791 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
792 .collect(),
793 )
794 .expect("non-duplicate");
795 fragment_actors
796 .try_insert(
797 fragment_id,
798 (
799 node,
800 builder
801 .actor_builders
802 .into_values()
803 .map(|builder| builder.build(job, expr_context.clone()))
804 .try_collect()?,
805 ),
806 )
807 .expect("non-duplicate");
808 }
809
810 {
811 fragment_actors
812 .into_iter()
813 .map(|(fragment_id, (stream_node, actors))| {
814 let distribution = self.distributions[&fragment_id].clone();
815 let fragment = self.fragment_graph.seal_fragment(
816 fragment_id,
817 actors,
818 distribution,
819 stream_node,
820 );
821 let fragment_id = fragment_id.as_global_id();
822 (fragment_id, fragment)
823 })
824 .collect()
825 }
826 };
827
828 let building_locations = self.build_locations(building_locations);
830 let existing_locations = self.build_locations(external_locations);
831
832 let upstream_fragment_downstreams = upstream_fragment_changes
834 .into_iter()
835 .map(|(fragment_id, changes)| {
836 (
837 fragment_id.as_global_id(),
838 changes
839 .new_downstreams
840 .into_iter()
841 .map(|(downstream_fragment_id, new_dispatch)| {
842 (downstream_fragment_id.as_global_id(), new_dispatch).into()
843 })
844 .collect(),
845 )
846 })
847 .collect();
848
849 let replace_upstream = downstream_fragment_changes
851 .into_iter()
852 .map(|(fragment_id, changes)| {
853 let fragment_id = fragment_id.as_global_id();
854 let new_no_shuffle = &mut new_no_shuffle;
855 (
856 fragment_id,
857 changes
858 .new_upstreams
859 .into_iter()
860 .map(
861 move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
862 let upstream_fragment_id = upstream_fragment_id.as_global_id();
863 if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
864 && !upstream_new_no_shuffle.is_empty()
865 {
866 let no_shuffle_actors = new_no_shuffle
867 .entry(upstream_fragment_id)
868 .or_default()
869 .entry(fragment_id)
870 .or_default();
871 no_shuffle_actors.extend(
872 upstream_new_no_shuffle.into_iter().map(
873 |(upstream_actor_id, actor_id)| {
874 (
875 upstream_actor_id.as_global_id(),
876 actor_id.as_global_id(),
877 )
878 },
879 ),
880 );
881 }
882 let DownstreamExternalEdgeId {
883 original_upstream_fragment_id,
884 ..
885 } = edge_id;
886 (
887 original_upstream_fragment_id.as_global_id(),
888 upstream_fragment_id,
889 )
890 },
891 )
892 .collect(),
893 )
894 })
895 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
896 .collect();
897
898 Ok(ActorGraphBuildResult {
899 graph,
900 downstream_fragment_relations,
901 building_locations,
902 existing_locations,
903 upstream_fragment_downstreams,
904 replace_upstream,
905 new_no_shuffle,
906 })
907 }
908
909 fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
911 let mut state = ActorGraphBuildState::new(id_gen);
912
913 for fragment_id in self.fragment_graph.topo_order()? {
916 self.build_actor_graph_fragment(fragment_id, &mut state)?;
917 }
918
919 Ok(state.finish())
920 }
921
922 fn build_actor_graph_fragment(
924 &self,
925 fragment_id: GlobalFragmentId,
926 state: &mut ActorGraphBuildState,
927 ) -> MetaResult<()> {
928 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
929 let distribution = self.get_distribution(fragment_id);
930
931 let actor_ids = match current_fragment {
933 EitherFragment::Building(current_fragment) => {
935 let node = current_fragment.node.clone().unwrap();
936 state
937 .inner
938 .fragment_actor_builders
939 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
940 .expect("non-duplicate");
941 let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
942
943 distribution
944 .actors()
945 .map(|alignment_id| {
946 let actor_id = state.next_actor_id();
947 let vnode_bitmap = bitmaps
948 .as_ref()
949 .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
950 .cloned();
951
952 state
953 .inner
954 .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
955
956 actor_id
957 })
958 .collect_vec()
959 }
960
961 EitherFragment::Existing(existing_fragment) => existing_fragment
963 .actors
964 .iter()
965 .map(|a| {
966 let actor_id = GlobalActorId::new(a.actor_id);
967 let alignment_id = match &distribution {
968 Distribution::Singleton(worker_id) => {
969 ActorAlignmentId::new_single(*worker_id as u32)
970 }
971 Distribution::Hash(mapping) => mapping
972 .get_matched(a.vnode_bitmap.as_ref().unwrap())
973 .unwrap(),
974 };
975
976 state.inner.record_external_location(actor_id, alignment_id);
977
978 actor_id
979 })
980 .collect_vec(),
981 };
982
983 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
985 let downstream_actors = state
986 .fragment_actors
987 .get(&downstream_fragment_id)
988 .expect("downstream fragment not processed yet");
989
990 state.inner.add_link(
991 FragmentLinkNode {
992 fragment_id,
993 actor_ids: &actor_ids,
994 },
995 FragmentLinkNode {
996 fragment_id: downstream_fragment_id,
997 actor_ids: downstream_actors,
998 },
999 edge,
1000 );
1001 }
1002
1003 state
1005 .fragment_actors
1006 .try_insert(fragment_id, actor_ids)
1007 .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1008
1009 Ok(())
1010 }
1011}