1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37 Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38 FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42 StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48#[derive(Debug)]
50struct ActorBuilder {
51 actor_id: GlobalActorId,
53
54 fragment_id: GlobalFragmentId,
56
57 vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62 fn new(
63 actor_id: GlobalActorId,
64 fragment_id: GlobalFragmentId,
65 vnode_bitmap: Option<Bitmap>,
66 ) -> Self {
67 Self {
68 actor_id,
69 fragment_id,
70 vnode_bitmap,
71 }
72 }
73}
74
75impl FragmentActorBuilder {
76 fn rewrite(&self) -> MetaResult<StreamNode> {
82 self.rewrite_inner(&self.node, 0)
83 }
84
85 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86 match stream_node.get_node_body()? {
87 NodeBody::Exchange(exchange) => {
89 if depth == 0 {
92 bail!(
93 "there should be no ExchangeNode on the top of the plan node: {:#?}",
94 stream_node
95 )
96 }
97 assert!(!stream_node.get_fields().is_empty());
98 assert!(stream_node.input.is_empty());
99
100 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102 link_id: stream_node.get_operator_id(),
103 }];
104
105 let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107 Ok(StreamNode {
108 node_body: Some(NodeBody::Merge(Box::new({
109 MergeNode {
110 upstream_fragment_id,
111 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
112 ..Default::default()
113 }
114 }))),
115 identity: "MergeExecutor".to_owned(),
116 ..stream_node.clone()
117 })
118 }
119
120 NodeBody::StreamScan(stream_scan) => {
122 let input = stream_node.get_input();
123 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
124 assert!(input.is_empty());
127 return Ok(stream_node.clone());
128 }
129 assert_eq!(input.len(), 2);
130
131 let merge_node = &input[0];
132 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
133 let batch_plan_node = &input[1];
134 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
135
136 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
138 [&EdgeId::UpstreamExternal {
139 upstream_table_id: stream_scan.table_id.into(),
140 downstream_fragment_id: self.fragment_id,
141 }];
142
143 let is_shuffled_backfill = stream_scan.stream_scan_type
144 == StreamScanType::ArrangementBackfill as i32
145 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
146 if !is_shuffled_backfill {
147 assert!(upstream_no_shuffle_actor.is_some());
148 }
149
150 let upstream_dispatcher_type = if is_shuffled_backfill {
151 DispatcherType::Hash as _
154 } else {
155 DispatcherType::NoShuffle as _
156 };
157
158 let upstream_fragment_id = upstream_fragment_id.as_global_id();
159
160 let input = vec![
161 StreamNode {
163 node_body: Some(NodeBody::Merge(Box::new({
164 MergeNode {
165 upstream_fragment_id,
166 upstream_dispatcher_type,
167 ..Default::default()
168 }
169 }))),
170 ..merge_node.clone()
171 },
172 batch_plan_node.clone(),
173 ];
174
175 Ok(StreamNode {
176 input,
177 ..stream_node.clone()
178 })
179 }
180
181 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
185 let input = stream_node.get_input();
186 assert_eq!(input.len(), 1);
187
188 let merge_node = &input[0];
189 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
190
191 let upstream_source_id = match stream_node.get_node_body()? {
192 NodeBody::CdcFilter(node) => node.upstream_source_id,
193 NodeBody::SourceBackfill(node) => node.upstream_source_id,
194 _ => unreachable!(),
195 };
196
197 let (upstream_fragment_id, upstream_actors) = &self.upstreams
199 [&EdgeId::UpstreamExternal {
200 upstream_table_id: upstream_source_id.into(),
201 downstream_fragment_id: self.fragment_id,
202 }];
203
204 assert!(
205 upstream_actors.is_some(),
206 "Upstream Cdc Source should be singleton. \
207 SourceBackfill is NoShuffle 1-1 correspondence. \
208 So they both should have only one upstream actor."
209 );
210
211 let upstream_fragment_id = upstream_fragment_id.as_global_id();
212
213 let input = vec![
215 StreamNode {
217 node_body: Some(NodeBody::Merge(Box::new({
218 MergeNode {
219 upstream_fragment_id,
220 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
221 ..Default::default()
222 }
223 }))),
224 ..merge_node.clone()
225 },
226 ];
227 Ok(StreamNode {
228 input,
229 ..stream_node.clone()
230 })
231 }
232
233 _ => {
235 let mut new_stream_node = stream_node.clone();
236 for (input, new_input) in stream_node
237 .input
238 .iter()
239 .zip_eq_fast(&mut new_stream_node.input)
240 {
241 *new_input = self.rewrite_inner(input, depth + 1)?;
242 }
243 Ok(new_stream_node)
244 }
245 }
246 }
247}
248
249impl ActorBuilder {
250 fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
252 #[cfg(not(debug_assertions))]
254 let mview_definition = job.name();
255 #[cfg(debug_assertions)]
256 let mview_definition = job.definition();
257
258 Ok(StreamActor {
259 actor_id: self.actor_id.as_global_id(),
260 fragment_id: self.fragment_id.as_global_id(),
261 vnode_bitmap: self.vnode_bitmap,
262 mview_definition,
263 expr_context: Some(expr_context),
264 })
265 }
266}
267
268#[derive(Default)]
273struct UpstreamFragmentChange {
274 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
276}
277
278#[derive(Default)]
279struct DownstreamFragmentChange {
280 new_upstreams:
283 HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
284}
285
286impl UpstreamFragmentChange {
287 fn add_dispatcher(
289 &mut self,
290 downstream_fragment_id: GlobalFragmentId,
291 dispatch: DispatchStrategy,
292 ) {
293 self.new_downstreams
294 .try_insert(downstream_fragment_id, dispatch)
295 .unwrap();
296 }
297}
298
299impl DownstreamFragmentChange {
300 fn add_upstream(
302 &mut self,
303 edge_id: DownstreamExternalEdgeId,
304 new_upstream_fragment_id: GlobalFragmentId,
305 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
306 ) {
307 self.new_upstreams
308 .try_insert(
309 edge_id,
310 (new_upstream_fragment_id, no_shuffle_actor_mapping),
311 )
312 .unwrap();
313 }
314}
315
316type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
318type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
320
321#[derive(Debug)]
322struct FragmentActorBuilder {
323 fragment_id: GlobalFragmentId,
324 node: StreamNode,
325 actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
326 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
327 upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
329}
330
331impl FragmentActorBuilder {
332 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
333 Self {
334 fragment_id,
335 node,
336 actor_builders: Default::default(),
337 downstreams: Default::default(),
338 upstreams: Default::default(),
339 }
340 }
341}
342
343#[derive(Default)]
350struct ActorGraphBuildStateInner {
351 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
353
354 building_locations: ActorLocations,
356
357 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
360
361 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
364
365 external_locations: ActorLocations,
367}
368
369struct FragmentLinkNode<'a> {
371 fragment_id: GlobalFragmentId,
372 actor_ids: &'a [GlobalActorId],
373}
374
375impl ActorGraphBuildStateInner {
376 fn add_actor(
380 &mut self,
381 (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
382 actor_alignment_id: ActorAlignmentId,
383 vnode_bitmap: Option<Bitmap>,
384 ) {
385 self.fragment_actor_builders
386 .get_mut(&fragment_id)
387 .expect("should added previously")
388 .actor_builders
389 .try_insert(
390 actor_id,
391 ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
392 )
393 .unwrap();
394
395 self.building_locations
396 .try_insert(actor_id, actor_alignment_id)
397 .unwrap();
398 }
399
400 fn record_external_location(
402 &mut self,
403 actor_id: GlobalActorId,
404 actor_alignment_id: ActorAlignmentId,
405 ) {
406 self.external_locations
407 .try_insert(actor_id, actor_alignment_id)
408 .unwrap();
409 }
410
411 fn add_dispatcher(
416 &mut self,
417 fragment_id: GlobalFragmentId,
418 downstream_fragment_id: GlobalFragmentId,
419 dispatch: DispatchStrategy,
420 ) {
421 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
422 builder
423 .downstreams
424 .try_insert(downstream_fragment_id, dispatch)
425 .unwrap();
426 } else {
427 self.upstream_fragment_changes
428 .entry(fragment_id)
429 .or_default()
430 .add_dispatcher(downstream_fragment_id, dispatch);
431 }
432 }
433
434 fn add_upstream(
439 &mut self,
440 fragment_id: GlobalFragmentId,
441 edge_id: EdgeId,
442 upstream_fragment_id: GlobalFragmentId,
443 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
444 ) {
445 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
446 builder
447 .upstreams
448 .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
449 .unwrap();
450 } else {
451 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
452 unreachable!("edge from internal to external must be `DownstreamExternal`")
453 };
454 self.downstream_fragment_changes
455 .entry(fragment_id)
456 .or_default()
457 .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
458 }
459 }
460
461 fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
464 self.building_locations
465 .get(&actor_id)
466 .copied()
467 .or_else(|| self.external_locations.get(&actor_id).copied())
468 .unwrap()
469 }
470
471 fn add_link<'a>(
480 &mut self,
481 upstream: FragmentLinkNode<'a>,
482 downstream: FragmentLinkNode<'a>,
483 edge: &'a StreamFragmentEdge,
484 ) {
485 let dt = edge.dispatch_strategy.r#type();
486
487 match dt {
488 DispatcherType::NoShuffle => {
490 assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
491 let upstream_locations: HashMap<_, _> = upstream
492 .actor_ids
493 .iter()
494 .map(|id| (self.get_location(*id), *id))
495 .collect();
496 let downstream_locations: HashMap<_, _> = downstream
497 .actor_ids
498 .iter()
499 .map(|id| (self.get_location(*id), *id))
500 .collect();
501
502 self.add_dispatcher(
504 upstream.fragment_id,
505 downstream.fragment_id,
506 edge.dispatch_strategy.clone(),
507 );
508
509 self.add_upstream(
511 downstream.fragment_id,
512 edge.id,
513 upstream.fragment_id,
514 Some(
515 downstream_locations
516 .iter()
517 .map(|(location, downstream_actor_id)| {
518 let upstream_actor_id = upstream_locations.get(location).unwrap();
519 (*upstream_actor_id, *downstream_actor_id)
520 })
521 .collect(),
522 ),
523 );
524 }
525
526 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
528 self.add_dispatcher(
529 upstream.fragment_id,
530 downstream.fragment_id,
531 edge.dispatch_strategy.clone(),
532 );
533 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
534 }
535
536 DispatcherType::Unspecified => unreachable!(),
537 }
538 }
539}
540
541struct ActorGraphBuildState {
543 inner: ActorGraphBuildStateInner,
545
546 fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
548
549 next_local_id: u32,
551
552 actor_id_gen: GlobalActorIdGen,
554}
555
556impl ActorGraphBuildState {
557 fn new(actor_id_gen: GlobalActorIdGen) -> Self {
559 Self {
560 inner: Default::default(),
561 fragment_actors: Default::default(),
562 next_local_id: 0,
563 actor_id_gen,
564 }
565 }
566
567 fn next_actor_id(&mut self) -> GlobalActorId {
569 let local_id = self.next_local_id;
570 self.next_local_id += 1;
571
572 self.actor_id_gen.to_global_id(local_id)
573 }
574
575 fn finish(self) -> ActorGraphBuildStateInner {
577 assert_eq!(self.actor_id_gen.len(), self.next_local_id);
579
580 self.inner
581 }
582}
583
584pub struct ActorGraphBuildResult {
587 pub graph: BTreeMap<FragmentId, Fragment>,
589 pub downstream_fragment_relations: FragmentDownstreamRelation,
592
593 pub building_locations: Locations,
595
596 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
598
599 pub replace_upstream: FragmentReplaceUpstream,
602
603 pub new_no_shuffle: FragmentNewNoShuffle,
607}
608
609#[derive(Debug)]
612pub struct ActorGraphBuilder {
613 distributions: HashMap<GlobalFragmentId, Distribution>,
615
616 existing_distributions: HashMap<GlobalFragmentId, Distribution>,
618
619 fragment_graph: CompleteStreamFragmentGraph,
621
622 cluster_info: StreamingClusterInfo,
624}
625
626impl ActorGraphBuilder {
627 pub fn new(
630 streaming_job_id: u32,
631 resource_group: String,
632 fragment_graph: CompleteStreamFragmentGraph,
633 cluster_info: StreamingClusterInfo,
634 default_parallelism: NonZeroUsize,
635 ) -> MetaResult<Self> {
636 let expected_vnode_count = fragment_graph.max_parallelism();
637 let existing_distributions = fragment_graph.existing_distribution();
638
639 let schedulable_workers =
640 cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
641
642 let scheduler = schedule::Scheduler::new(
643 streaming_job_id,
644 &schedulable_workers,
645 default_parallelism,
646 expected_vnode_count,
647 )?;
648
649 let distributions = scheduler.schedule(&fragment_graph)?;
650
651 let mut fragment_graph = fragment_graph;
653 for (id, fragment) in fragment_graph.building_fragments_mut() {
654 let mut error = None;
655 let fragment_vnode_count = distributions[id].vnode_count();
656 visit_tables(fragment, |table, _| {
657 if error.is_some() {
658 return;
659 }
660 let vnode_count = if table.is_singleton() {
663 if fragment_vnode_count > 1 {
664 tracing::info!(
665 table.name,
666 "found singleton table in hash-distributed fragment"
667 );
668 }
669 1
670 } else {
671 fragment_vnode_count
672 };
673 match table.vnode_count_inner().value_opt() {
674 Some(required_vnode_count) if required_vnode_count != vnode_count => {
681 error = Some(format!(
682 "failed to align vnode count for table {}({}): required {}, but got {}",
683 table.id, table.name, required_vnode_count, vnode_count
684 ));
685 }
686 _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
688 }
689 });
690 if let Some(error) = error {
691 bail!(error);
692 }
693 }
694
695 Ok(Self {
696 distributions,
697 existing_distributions,
698 fragment_graph,
699 cluster_info,
700 })
701 }
702
703 fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
706 self.distributions
707 .get(&fragment_id)
708 .or_else(|| self.existing_distributions.get(&fragment_id))
709 .unwrap()
710 }
711
712 fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
714 let actor_locations = actor_locations
715 .into_iter()
716 .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
717 .collect();
718
719 let worker_locations = self
720 .cluster_info
721 .worker_nodes
722 .iter()
723 .map(|(id, node)| (*id as WorkerId, node.clone()))
724 .collect();
725
726 Locations {
727 actor_locations,
728 worker_locations,
729 }
730 }
731
732 pub fn generate_graph(
735 self,
736 env: &MetaSrvEnv,
737 job: &StreamingJob,
738 expr_context: ExprContext,
739 ) -> MetaResult<ActorGraphBuildResult> {
740 let actor_len = self
742 .distributions
743 .values()
744 .map(|d| d.parallelism())
745 .sum::<usize>() as u64;
746 let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
747
748 let ActorGraphBuildStateInner {
750 fragment_actor_builders,
751 building_locations,
752 downstream_fragment_changes,
753 upstream_fragment_changes,
754 external_locations,
755 } = self.build_actor_graph(id_gen)?;
756
757 for alignment_id in external_locations.values() {
758 if self
759 .cluster_info
760 .unschedulable_workers
761 .contains(&alignment_id.worker_id())
762 {
763 bail!(
764 "The worker {} where the associated upstream is located is unschedulable",
765 alignment_id.worker_id(),
766 );
767 }
768 }
769
770 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
771 let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
772 let graph = {
774 let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
775 HashMap::new();
776
777 for (fragment_id, builder) in fragment_actor_builders {
780 let global_fragment_id = fragment_id.as_global_id();
781 let node = builder.rewrite()?;
782 for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
783 if let Some(no_shuffle_upstream) = no_shuffle_upstream {
784 new_no_shuffle
785 .entry(upstream_fragment_id.as_global_id())
786 .or_default()
787 .try_insert(
788 global_fragment_id,
789 no_shuffle_upstream
790 .iter()
791 .map(|(upstream_actor_id, actor_id)| {
792 (upstream_actor_id.as_global_id(), actor_id.as_global_id())
793 })
794 .collect(),
795 )
796 .expect("non-duplicate");
797 }
798 }
799 downstream_fragment_relations
800 .try_insert(
801 global_fragment_id,
802 builder
803 .downstreams
804 .into_iter()
805 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
806 .collect(),
807 )
808 .expect("non-duplicate");
809 fragment_actors
810 .try_insert(
811 fragment_id,
812 (
813 node,
814 builder
815 .actor_builders
816 .into_values()
817 .map(|builder| builder.build(job, expr_context.clone()))
818 .try_collect()?,
819 ),
820 )
821 .expect("non-duplicate");
822 }
823
824 {
825 fragment_actors
826 .into_iter()
827 .map(|(fragment_id, (stream_node, actors))| {
828 let distribution = self.distributions[&fragment_id].clone();
829 let fragment = self.fragment_graph.seal_fragment(
830 fragment_id,
831 actors,
832 distribution,
833 stream_node,
834 );
835 let fragment_id = fragment_id.as_global_id();
836 (fragment_id, fragment)
837 })
838 .collect()
839 }
840 };
841
842 let building_locations = self.build_locations(building_locations);
844
845 let upstream_fragment_downstreams = upstream_fragment_changes
847 .into_iter()
848 .map(|(fragment_id, changes)| {
849 (
850 fragment_id.as_global_id(),
851 changes
852 .new_downstreams
853 .into_iter()
854 .map(|(downstream_fragment_id, new_dispatch)| {
855 (downstream_fragment_id.as_global_id(), new_dispatch).into()
856 })
857 .collect(),
858 )
859 })
860 .collect();
861
862 let replace_upstream = downstream_fragment_changes
864 .into_iter()
865 .map(|(fragment_id, changes)| {
866 let fragment_id = fragment_id.as_global_id();
867 let new_no_shuffle = &mut new_no_shuffle;
868 (
869 fragment_id,
870 changes
871 .new_upstreams
872 .into_iter()
873 .map(
874 move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
875 let upstream_fragment_id = upstream_fragment_id.as_global_id();
876 if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
877 && !upstream_new_no_shuffle.is_empty()
878 {
879 let no_shuffle_actors = new_no_shuffle
880 .entry(upstream_fragment_id)
881 .or_default()
882 .entry(fragment_id)
883 .or_default();
884 no_shuffle_actors.extend(
885 upstream_new_no_shuffle.into_iter().map(
886 |(upstream_actor_id, actor_id)| {
887 (
888 upstream_actor_id.as_global_id(),
889 actor_id.as_global_id(),
890 )
891 },
892 ),
893 );
894 }
895 let DownstreamExternalEdgeId {
896 original_upstream_fragment_id,
897 ..
898 } = edge_id;
899 (
900 original_upstream_fragment_id.as_global_id(),
901 upstream_fragment_id,
902 )
903 },
904 )
905 .collect(),
906 )
907 })
908 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
909 .collect();
910
911 Ok(ActorGraphBuildResult {
912 graph,
913 downstream_fragment_relations,
914 building_locations,
915 upstream_fragment_downstreams,
916 replace_upstream,
917 new_no_shuffle,
918 })
919 }
920
921 fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
923 let mut state = ActorGraphBuildState::new(id_gen);
924
925 for fragment_id in self.fragment_graph.topo_order()? {
928 self.build_actor_graph_fragment(fragment_id, &mut state)?;
929 }
930
931 Ok(state.finish())
932 }
933
934 fn build_actor_graph_fragment(
936 &self,
937 fragment_id: GlobalFragmentId,
938 state: &mut ActorGraphBuildState,
939 ) -> MetaResult<()> {
940 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
941 let distribution = self.get_distribution(fragment_id);
942
943 let actor_ids = match current_fragment {
945 EitherFragment::Building(current_fragment) => {
947 let node = current_fragment.node.clone().unwrap();
948 state
949 .inner
950 .fragment_actor_builders
951 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
952 .expect("non-duplicate");
953 let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
954
955 distribution
956 .actors()
957 .map(|alignment_id| {
958 let actor_id = state.next_actor_id();
959 let vnode_bitmap = bitmaps
960 .as_ref()
961 .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
962 .cloned();
963
964 state
965 .inner
966 .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
967
968 actor_id
969 })
970 .collect_vec()
971 }
972
973 EitherFragment::Existing(existing_fragment) => existing_fragment
975 .actors
976 .iter()
977 .map(|a| {
978 let actor_id = GlobalActorId::new(a.actor_id);
979 let alignment_id = match &distribution {
980 Distribution::Singleton(worker_id) => {
981 ActorAlignmentId::new_single(*worker_id as u32)
982 }
983 Distribution::Hash(mapping) => mapping
984 .get_matched(a.vnode_bitmap.as_ref().unwrap())
985 .unwrap(),
986 };
987
988 state.inner.record_external_location(actor_id, alignment_id);
989
990 actor_id
991 })
992 .collect_vec(),
993 };
994
995 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
997 let downstream_actors = state
998 .fragment_actors
999 .get(&downstream_fragment_id)
1000 .expect("downstream fragment not processed yet");
1001
1002 state.inner.add_link(
1003 FragmentLinkNode {
1004 fragment_id,
1005 actor_ids: &actor_ids,
1006 },
1007 FragmentLinkNode {
1008 fragment_id: downstream_fragment_id,
1009 actor_ids: downstream_actors,
1010 },
1011 edge,
1012 );
1013 }
1014
1015 state
1017 .fragment_actors
1018 .try_insert(fragment_id, actor_ids)
1019 .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1020
1021 Ok(())
1022 }
1023}