1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::id::JobId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::stream_graph_visitor::visit_tables;
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::plan_common::ExprContext;
28use risingwave_pb::stream_plan::stream_node::NodeBody;
29use risingwave_pb::stream_plan::{
30 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
31};
32
33use super::Locations;
34use crate::MetaResult;
35use crate::controller::cluster::StreamingClusterInfo;
36use crate::manager::{MetaSrvEnv, StreamingJob};
37use crate::model::{
38 Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
39 FragmentReplaceUpstream, StreamActor,
40};
41use crate::stream::stream_graph::fragment::{
42 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
43 StreamFragmentEdge,
44};
45use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
46use crate::stream::stream_graph::schedule;
47use crate::stream::stream_graph::schedule::Distribution;
48
49#[derive(Debug)]
51struct ActorBuilder {
52 actor_id: GlobalActorId,
54
55 fragment_id: GlobalFragmentId,
57
58 vnode_bitmap: Option<Bitmap>,
60}
61
62impl ActorBuilder {
63 fn new(
64 actor_id: GlobalActorId,
65 fragment_id: GlobalFragmentId,
66 vnode_bitmap: Option<Bitmap>,
67 ) -> Self {
68 Self {
69 actor_id,
70 fragment_id,
71 vnode_bitmap,
72 }
73 }
74}
75
76impl FragmentActorBuilder {
77 fn rewrite(&self) -> MetaResult<StreamNode> {
83 self.rewrite_inner(&self.node, 0)
84 }
85
86 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
87 match stream_node.get_node_body()? {
88 NodeBody::Exchange(exchange) => {
90 if depth == 0 {
93 bail!(
94 "there should be no ExchangeNode on the top of the plan node: {:#?}",
95 stream_node
96 )
97 }
98 assert!(!stream_node.get_fields().is_empty());
99 assert!(stream_node.input.is_empty());
100
101 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
103 link_id: stream_node.get_operator_id(),
104 }];
105
106 let upstream_fragment_id = upstream_fragment_id.as_global_id();
107
108 Ok(StreamNode {
109 node_body: Some(NodeBody::Merge(Box::new({
110 MergeNode {
111 upstream_fragment_id,
112 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
113 ..Default::default()
114 }
115 }))),
116 identity: "MergeExecutor".to_owned(),
117 ..stream_node.clone()
118 })
119 }
120
121 NodeBody::StreamScan(stream_scan) => {
123 let input = stream_node.get_input();
124 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
125 assert!(input.is_empty());
128 return Ok(stream_node.clone());
129 }
130 assert_eq!(input.len(), 2);
131
132 let merge_node = &input[0];
133 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
134 let batch_plan_node = &input[1];
135 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
136
137 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
139 [&EdgeId::UpstreamExternal {
140 upstream_table_id: stream_scan.table_id.into(),
141 downstream_fragment_id: self.fragment_id,
142 }];
143
144 let is_shuffled_backfill = stream_scan.stream_scan_type
145 == StreamScanType::ArrangementBackfill as i32
146 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
147 if !is_shuffled_backfill {
148 assert!(upstream_no_shuffle_actor.is_some());
149 }
150
151 let upstream_dispatcher_type = if is_shuffled_backfill {
152 DispatcherType::Hash as _
155 } else {
156 DispatcherType::NoShuffle as _
157 };
158
159 let upstream_fragment_id = upstream_fragment_id.as_global_id();
160
161 let input = vec![
162 StreamNode {
164 node_body: Some(NodeBody::Merge(Box::new({
165 MergeNode {
166 upstream_fragment_id,
167 upstream_dispatcher_type,
168 ..Default::default()
169 }
170 }))),
171 ..merge_node.clone()
172 },
173 batch_plan_node.clone(),
174 ];
175
176 Ok(StreamNode {
177 input,
178 ..stream_node.clone()
179 })
180 }
181
182 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
186 let input = stream_node.get_input();
187 assert_eq!(input.len(), 1);
188
189 let merge_node = &input[0];
190 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
191
192 let upstream_source_id = match stream_node.get_node_body()? {
193 NodeBody::CdcFilter(node) => node.upstream_source_id,
194 NodeBody::SourceBackfill(node) => node.upstream_source_id,
195 _ => unreachable!(),
196 };
197
198 let (upstream_fragment_id, upstream_actors) = &self.upstreams
200 [&EdgeId::UpstreamExternal {
201 upstream_table_id: upstream_source_id.into(),
202 downstream_fragment_id: self.fragment_id,
203 }];
204
205 assert!(
206 upstream_actors.is_some(),
207 "Upstream Cdc Source should be singleton. \
208 SourceBackfill is NoShuffle 1-1 correspondence. \
209 So they both should have only one upstream actor."
210 );
211
212 let upstream_fragment_id = upstream_fragment_id.as_global_id();
213
214 let input = vec![
216 StreamNode {
218 node_body: Some(NodeBody::Merge(Box::new({
219 MergeNode {
220 upstream_fragment_id,
221 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
222 ..Default::default()
223 }
224 }))),
225 ..merge_node.clone()
226 },
227 ];
228 Ok(StreamNode {
229 input,
230 ..stream_node.clone()
231 })
232 }
233
234 _ => {
236 let mut new_stream_node = stream_node.clone();
237 for (input, new_input) in stream_node
238 .input
239 .iter()
240 .zip_eq_fast(&mut new_stream_node.input)
241 {
242 *new_input = self.rewrite_inner(input, depth + 1)?;
243 }
244 Ok(new_stream_node)
245 }
246 }
247 }
248}
249
250impl ActorBuilder {
251 fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
253 #[cfg(not(debug_assertions))]
255 let mview_definition = job.name();
256 #[cfg(debug_assertions)]
257 let mview_definition = job.definition();
258
259 Ok(StreamActor {
260 actor_id: self.actor_id.as_global_id(),
261 fragment_id: self.fragment_id.as_global_id(),
262 vnode_bitmap: self.vnode_bitmap,
263 mview_definition,
264 expr_context: Some(expr_context),
265 })
266 }
267}
268
269#[derive(Default)]
274struct UpstreamFragmentChange {
275 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
277}
278
279#[derive(Default)]
280struct DownstreamFragmentChange {
281 new_upstreams:
284 HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
285}
286
287impl UpstreamFragmentChange {
288 fn add_dispatcher(
290 &mut self,
291 downstream_fragment_id: GlobalFragmentId,
292 dispatch: DispatchStrategy,
293 ) {
294 self.new_downstreams
295 .try_insert(downstream_fragment_id, dispatch)
296 .unwrap();
297 }
298}
299
300impl DownstreamFragmentChange {
301 fn add_upstream(
303 &mut self,
304 edge_id: DownstreamExternalEdgeId,
305 new_upstream_fragment_id: GlobalFragmentId,
306 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
307 ) {
308 self.new_upstreams
309 .try_insert(
310 edge_id,
311 (new_upstream_fragment_id, no_shuffle_actor_mapping),
312 )
313 .unwrap();
314 }
315}
316
317type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
319type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
321
322#[derive(Debug)]
323struct FragmentActorBuilder {
324 fragment_id: GlobalFragmentId,
325 node: StreamNode,
326 actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
327 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
328 upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
330}
331
332impl FragmentActorBuilder {
333 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
334 Self {
335 fragment_id,
336 node,
337 actor_builders: Default::default(),
338 downstreams: Default::default(),
339 upstreams: Default::default(),
340 }
341 }
342}
343
344#[derive(Default)]
351struct ActorGraphBuildStateInner {
352 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
354
355 building_locations: ActorLocations,
357
358 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
361
362 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
365
366 external_locations: ActorLocations,
368}
369
370struct FragmentLinkNode<'a> {
372 fragment_id: GlobalFragmentId,
373 actor_ids: &'a [GlobalActorId],
374}
375
376impl ActorGraphBuildStateInner {
377 fn add_actor(
381 &mut self,
382 (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
383 actor_alignment_id: ActorAlignmentId,
384 vnode_bitmap: Option<Bitmap>,
385 ) {
386 self.fragment_actor_builders
387 .get_mut(&fragment_id)
388 .expect("should added previously")
389 .actor_builders
390 .try_insert(
391 actor_id,
392 ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
393 )
394 .unwrap();
395
396 self.building_locations
397 .try_insert(actor_id, actor_alignment_id)
398 .unwrap();
399 }
400
401 fn record_external_location(
403 &mut self,
404 actor_id: GlobalActorId,
405 actor_alignment_id: ActorAlignmentId,
406 ) {
407 self.external_locations
408 .try_insert(actor_id, actor_alignment_id)
409 .unwrap();
410 }
411
412 fn add_dispatcher(
417 &mut self,
418 fragment_id: GlobalFragmentId,
419 downstream_fragment_id: GlobalFragmentId,
420 dispatch: DispatchStrategy,
421 ) {
422 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
423 builder
424 .downstreams
425 .try_insert(downstream_fragment_id, dispatch)
426 .unwrap();
427 } else {
428 self.upstream_fragment_changes
429 .entry(fragment_id)
430 .or_default()
431 .add_dispatcher(downstream_fragment_id, dispatch);
432 }
433 }
434
435 fn add_upstream(
440 &mut self,
441 fragment_id: GlobalFragmentId,
442 edge_id: EdgeId,
443 upstream_fragment_id: GlobalFragmentId,
444 no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
445 ) {
446 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
447 builder
448 .upstreams
449 .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
450 .unwrap();
451 } else {
452 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
453 unreachable!("edge from internal to external must be `DownstreamExternal`")
454 };
455 self.downstream_fragment_changes
456 .entry(fragment_id)
457 .or_default()
458 .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
459 }
460 }
461
462 fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
465 self.building_locations
466 .get(&actor_id)
467 .copied()
468 .or_else(|| self.external_locations.get(&actor_id).copied())
469 .unwrap()
470 }
471
472 fn add_link<'a>(
481 &mut self,
482 upstream: FragmentLinkNode<'a>,
483 downstream: FragmentLinkNode<'a>,
484 edge: &'a StreamFragmentEdge,
485 ) {
486 let dt = edge.dispatch_strategy.r#type();
487
488 match dt {
489 DispatcherType::NoShuffle => {
491 assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
492 let upstream_locations: HashMap<_, _> = upstream
493 .actor_ids
494 .iter()
495 .map(|id| (self.get_location(*id), *id))
496 .collect();
497 let downstream_locations: HashMap<_, _> = downstream
498 .actor_ids
499 .iter()
500 .map(|id| (self.get_location(*id), *id))
501 .collect();
502
503 self.add_dispatcher(
505 upstream.fragment_id,
506 downstream.fragment_id,
507 edge.dispatch_strategy.clone(),
508 );
509
510 self.add_upstream(
512 downstream.fragment_id,
513 edge.id,
514 upstream.fragment_id,
515 Some(
516 downstream_locations
517 .iter()
518 .map(|(location, downstream_actor_id)| {
519 let upstream_actor_id = upstream_locations.get(location).unwrap();
520 (*upstream_actor_id, *downstream_actor_id)
521 })
522 .collect(),
523 ),
524 );
525 }
526
527 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
529 self.add_dispatcher(
530 upstream.fragment_id,
531 downstream.fragment_id,
532 edge.dispatch_strategy.clone(),
533 );
534 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
535 }
536
537 DispatcherType::Unspecified => unreachable!(),
538 }
539 }
540}
541
542struct ActorGraphBuildState {
544 inner: ActorGraphBuildStateInner,
546
547 fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
549
550 next_local_id: u32,
552
553 actor_id_gen: GlobalActorIdGen,
555}
556
557impl ActorGraphBuildState {
558 fn new(actor_id_gen: GlobalActorIdGen) -> Self {
560 Self {
561 inner: Default::default(),
562 fragment_actors: Default::default(),
563 next_local_id: 0,
564 actor_id_gen,
565 }
566 }
567
568 fn next_actor_id(&mut self) -> GlobalActorId {
570 let local_id = self.next_local_id;
571 self.next_local_id += 1;
572
573 self.actor_id_gen.to_global_id(local_id)
574 }
575
576 fn finish(self) -> ActorGraphBuildStateInner {
578 assert_eq!(self.actor_id_gen.len(), self.next_local_id);
580
581 self.inner
582 }
583}
584
585pub struct ActorGraphBuildResult {
588 pub graph: BTreeMap<FragmentId, Fragment>,
590 pub downstream_fragment_relations: FragmentDownstreamRelation,
593
594 pub building_locations: Locations,
596
597 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
599
600 pub replace_upstream: FragmentReplaceUpstream,
603
604 pub new_no_shuffle: FragmentNewNoShuffle,
608}
609
610#[derive(Debug)]
613pub struct ActorGraphBuilder {
614 distributions: HashMap<GlobalFragmentId, Distribution>,
616
617 existing_distributions: HashMap<GlobalFragmentId, Distribution>,
619
620 fragment_graph: CompleteStreamFragmentGraph,
622
623 cluster_info: StreamingClusterInfo,
625}
626
627impl ActorGraphBuilder {
628 pub fn new(
631 streaming_job_id: JobId,
632 resource_group: String,
633 fragment_graph: CompleteStreamFragmentGraph,
634 cluster_info: StreamingClusterInfo,
635 default_parallelism: NonZeroUsize,
636 ) -> MetaResult<Self> {
637 let expected_vnode_count = fragment_graph.max_parallelism();
638 let existing_distributions = fragment_graph.existing_distribution();
639
640 let schedulable_workers =
641 cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
642
643 let scheduler = schedule::Scheduler::new(
644 streaming_job_id,
645 &schedulable_workers,
646 default_parallelism,
647 expected_vnode_count,
648 )?;
649
650 let distributions = scheduler.schedule(&fragment_graph)?;
651
652 let mut fragment_graph = fragment_graph;
654 for (id, fragment) in fragment_graph.building_fragments_mut() {
655 let mut error = None;
656 let fragment_vnode_count = distributions[id].vnode_count();
657 visit_tables(fragment, |table, _| {
658 if error.is_some() {
659 return;
660 }
661 let vnode_count = if table.is_singleton() {
664 if fragment_vnode_count > 1 {
665 tracing::info!(
666 table.name,
667 "found singleton table in hash-distributed fragment"
668 );
669 }
670 1
671 } else {
672 fragment_vnode_count
673 };
674 match table.vnode_count_inner().value_opt() {
675 Some(required_vnode_count) if required_vnode_count != vnode_count => {
682 error = Some(format!(
683 "failed to align vnode count for table {}({}): required {}, but got {}",
684 table.id, table.name, required_vnode_count, vnode_count
685 ));
686 }
687 _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
689 }
690 });
691 if let Some(error) = error {
692 bail!(error);
693 }
694 }
695
696 Ok(Self {
697 distributions,
698 existing_distributions,
699 fragment_graph,
700 cluster_info,
701 })
702 }
703
704 fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
707 self.distributions
708 .get(&fragment_id)
709 .or_else(|| self.existing_distributions.get(&fragment_id))
710 .unwrap()
711 }
712
713 fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
715 let actor_locations = actor_locations
716 .into_iter()
717 .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
718 .collect();
719
720 let worker_locations = self
721 .cluster_info
722 .worker_nodes
723 .iter()
724 .map(|(id, node)| (*id as WorkerId, node.clone()))
725 .collect();
726
727 Locations {
728 actor_locations,
729 worker_locations,
730 }
731 }
732
733 pub fn generate_graph(
736 self,
737 env: &MetaSrvEnv,
738 job: &StreamingJob,
739 expr_context: ExprContext,
740 ) -> MetaResult<ActorGraphBuildResult> {
741 let actor_len = self
743 .distributions
744 .values()
745 .map(|d| d.parallelism())
746 .sum::<usize>() as u64;
747 let id_gen = GlobalActorIdGen::new(env.actor_id_generator(), actor_len);
748
749 let ActorGraphBuildStateInner {
751 fragment_actor_builders,
752 building_locations,
753 downstream_fragment_changes,
754 upstream_fragment_changes,
755 external_locations,
756 } = self.build_actor_graph(id_gen)?;
757
758 for alignment_id in external_locations.values() {
759 if self
760 .cluster_info
761 .unschedulable_workers
762 .contains(&alignment_id.worker_id())
763 {
764 bail!(
765 "The worker {} where the associated upstream is located is unschedulable",
766 alignment_id.worker_id(),
767 );
768 }
769 }
770
771 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
772 let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
773 let graph = {
775 let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
776 HashMap::new();
777
778 for (fragment_id, builder) in fragment_actor_builders {
781 let global_fragment_id = fragment_id.as_global_id();
782 let node = builder.rewrite()?;
783 for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
784 if let Some(no_shuffle_upstream) = no_shuffle_upstream {
785 new_no_shuffle
786 .entry(upstream_fragment_id.as_global_id())
787 .or_default()
788 .try_insert(
789 global_fragment_id,
790 no_shuffle_upstream
791 .iter()
792 .map(|(upstream_actor_id, actor_id)| {
793 (upstream_actor_id.as_global_id(), actor_id.as_global_id())
794 })
795 .collect(),
796 )
797 .expect("non-duplicate");
798 }
799 }
800 downstream_fragment_relations
801 .try_insert(
802 global_fragment_id,
803 builder
804 .downstreams
805 .into_iter()
806 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
807 .collect(),
808 )
809 .expect("non-duplicate");
810 fragment_actors
811 .try_insert(
812 fragment_id,
813 (
814 node,
815 builder
816 .actor_builders
817 .into_values()
818 .map(|builder| builder.build(job, expr_context.clone()))
819 .try_collect()?,
820 ),
821 )
822 .expect("non-duplicate");
823 }
824
825 {
826 fragment_actors
827 .into_iter()
828 .map(|(fragment_id, (stream_node, actors))| {
829 let distribution = self.distributions[&fragment_id].clone();
830 let fragment = self.fragment_graph.seal_fragment(
831 fragment_id,
832 actors,
833 distribution,
834 stream_node,
835 );
836 let fragment_id = fragment_id.as_global_id();
837 (fragment_id, fragment)
838 })
839 .collect()
840 }
841 };
842
843 let building_locations = self.build_locations(building_locations);
845
846 let upstream_fragment_downstreams = upstream_fragment_changes
848 .into_iter()
849 .map(|(fragment_id, changes)| {
850 (
851 fragment_id.as_global_id(),
852 changes
853 .new_downstreams
854 .into_iter()
855 .map(|(downstream_fragment_id, new_dispatch)| {
856 (downstream_fragment_id.as_global_id(), new_dispatch).into()
857 })
858 .collect(),
859 )
860 })
861 .collect();
862
863 let replace_upstream = downstream_fragment_changes
865 .into_iter()
866 .map(|(fragment_id, changes)| {
867 let fragment_id = fragment_id.as_global_id();
868 let new_no_shuffle = &mut new_no_shuffle;
869 (
870 fragment_id,
871 changes
872 .new_upstreams
873 .into_iter()
874 .map(
875 move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
876 let upstream_fragment_id = upstream_fragment_id.as_global_id();
877 if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
878 && !upstream_new_no_shuffle.is_empty()
879 {
880 let no_shuffle_actors = new_no_shuffle
881 .entry(upstream_fragment_id)
882 .or_default()
883 .entry(fragment_id)
884 .or_default();
885 no_shuffle_actors.extend(
886 upstream_new_no_shuffle.into_iter().map(
887 |(upstream_actor_id, actor_id)| {
888 (
889 upstream_actor_id.as_global_id(),
890 actor_id.as_global_id(),
891 )
892 },
893 ),
894 );
895 }
896 let DownstreamExternalEdgeId {
897 original_upstream_fragment_id,
898 ..
899 } = edge_id;
900 (
901 original_upstream_fragment_id.as_global_id(),
902 upstream_fragment_id,
903 )
904 },
905 )
906 .collect(),
907 )
908 })
909 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
910 .collect();
911
912 Ok(ActorGraphBuildResult {
913 graph,
914 downstream_fragment_relations,
915 building_locations,
916 upstream_fragment_downstreams,
917 replace_upstream,
918 new_no_shuffle,
919 })
920 }
921
922 fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
924 let mut state = ActorGraphBuildState::new(id_gen);
925
926 for fragment_id in self.fragment_graph.topo_order()? {
929 self.build_actor_graph_fragment(fragment_id, &mut state)?;
930 }
931
932 Ok(state.finish())
933 }
934
935 fn build_actor_graph_fragment(
937 &self,
938 fragment_id: GlobalFragmentId,
939 state: &mut ActorGraphBuildState,
940 ) -> MetaResult<()> {
941 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
942 let distribution = self.get_distribution(fragment_id);
943
944 let actor_ids = match current_fragment {
946 EitherFragment::Building(current_fragment) => {
948 let node = current_fragment.node.clone().unwrap();
949 state
950 .inner
951 .fragment_actor_builders
952 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
953 .expect("non-duplicate");
954 let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
955
956 distribution
957 .actors()
958 .map(|alignment_id| {
959 let actor_id = state.next_actor_id();
960 let vnode_bitmap = bitmaps
961 .as_ref()
962 .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
963 .cloned();
964
965 state
966 .inner
967 .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
968
969 actor_id
970 })
971 .collect_vec()
972 }
973
974 EitherFragment::Existing(existing_fragment) => existing_fragment
976 .actors
977 .iter()
978 .map(|(actor_id, actor_info)| {
979 let actor_id = GlobalActorId::new(*actor_id);
980 let alignment_id = match &distribution {
981 Distribution::Singleton(worker_id) => {
982 ActorAlignmentId::new_single(*worker_id as u32)
983 }
984 Distribution::Hash(mapping) => mapping
985 .get_matched(actor_info.vnode_bitmap.as_ref().unwrap())
986 .unwrap(),
987 };
988
989 state.inner.record_external_location(actor_id, alignment_id);
990
991 actor_id
992 })
993 .collect_vec(),
994 };
995
996 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
998 let downstream_actors = state
999 .fragment_actors
1000 .get(&downstream_fragment_id)
1001 .expect("downstream fragment not processed yet");
1002
1003 state.inner.add_link(
1004 FragmentLinkNode {
1005 fragment_id,
1006 actor_ids: &actor_ids,
1007 },
1008 FragmentLinkNode {
1009 fragment_id: downstream_fragment_id,
1010 actor_ids: downstream_actors,
1011 },
1012 edge,
1013 );
1014 }
1015
1016 state
1018 .fragment_actors
1019 .try_insert(fragment_id, actor_ids)
1020 .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1021
1022 Ok(())
1023 }
1024}