1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::id::JobId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::stream_graph_visitor::visit_tables;
26use risingwave_pb::stream_plan::stream_node::NodeBody;
27use risingwave_pb::stream_plan::{
28 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
29};
30
31use super::Locations;
32use crate::MetaResult;
33use crate::controller::cluster::StreamingClusterInfo;
34use crate::manager::{MetaSrvEnv, StreamingJob};
35use crate::model::{
36 Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream, StreamActor,
37 StreamContext,
38};
39use crate::stream::stream_graph::fragment::{
40 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
41 StreamFragmentEdge,
42};
43use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
44use crate::stream::stream_graph::schedule;
45use crate::stream::stream_graph::schedule::Distribution;
46
47#[derive(Debug)]
49struct ActorBuilder {
50 actor_id: GlobalActorId,
52
53 fragment_id: GlobalFragmentId,
55
56 vnode_bitmap: Option<Bitmap>,
58}
59
60impl ActorBuilder {
61 fn new(
62 actor_id: GlobalActorId,
63 fragment_id: GlobalFragmentId,
64 vnode_bitmap: Option<Bitmap>,
65 ) -> Self {
66 Self {
67 actor_id,
68 fragment_id,
69 vnode_bitmap,
70 }
71 }
72}
73
74impl FragmentActorBuilder {
75 fn rewrite(&self) -> MetaResult<StreamNode> {
81 self.rewrite_inner(&self.node, 0)
82 }
83
84 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
85 match stream_node.get_node_body()? {
86 NodeBody::Exchange(exchange) => {
88 if depth == 0 {
91 bail!(
92 "there should be no ExchangeNode on the top of the plan node: {:#?}",
93 stream_node
94 )
95 }
96 assert!(!stream_node.get_fields().is_empty());
97 assert!(stream_node.input.is_empty());
98
99 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
101 link_id: stream_node.get_operator_id().as_raw_id(),
102 }];
103
104 let upstream_fragment_id = upstream_fragment_id.as_global_id();
105
106 Ok(StreamNode {
107 node_body: Some(NodeBody::Merge(Box::new({
108 MergeNode {
109 upstream_fragment_id,
110 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
111 ..Default::default()
112 }
113 }))),
114 identity: "MergeExecutor".to_owned(),
115 ..stream_node.clone()
116 })
117 }
118
119 NodeBody::StreamScan(stream_scan) => {
121 let input = stream_node.get_input();
122 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
123 assert!(input.is_empty());
126 return Ok(stream_node.clone());
127 }
128 assert_eq!(input.len(), 2);
129
130 let merge_node = &input[0];
131 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
132 let batch_plan_node = &input[1];
133 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
134
135 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
137 [&EdgeId::UpstreamExternal {
138 upstream_job_id: stream_scan.table_id.as_job_id(),
139 downstream_fragment_id: self.fragment_id,
140 }];
141
142 let is_shuffled_backfill = stream_scan.stream_scan_type
143 == StreamScanType::ArrangementBackfill as i32
144 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
145 if !is_shuffled_backfill {
146 assert!(*upstream_no_shuffle_actor);
147 }
148
149 let upstream_dispatcher_type = if is_shuffled_backfill {
150 DispatcherType::Hash as _
153 } else {
154 DispatcherType::NoShuffle as _
155 };
156
157 let upstream_fragment_id = upstream_fragment_id.as_global_id();
158
159 let input = vec![
160 StreamNode {
162 node_body: Some(NodeBody::Merge(Box::new({
163 MergeNode {
164 upstream_fragment_id,
165 upstream_dispatcher_type,
166 ..Default::default()
167 }
168 }))),
169 ..merge_node.clone()
170 },
171 batch_plan_node.clone(),
172 ];
173
174 Ok(StreamNode {
175 input,
176 ..stream_node.clone()
177 })
178 }
179
180 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
184 let input = stream_node.get_input();
185 assert_eq!(input.len(), 1);
186
187 let merge_node = &input[0];
188 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
189
190 let upstream_source_id = match stream_node.get_node_body()? {
191 NodeBody::CdcFilter(node) => node.upstream_source_id,
192 NodeBody::SourceBackfill(node) => node.upstream_source_id,
193 _ => unreachable!(),
194 };
195
196 let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
198 [&EdgeId::UpstreamExternal {
199 upstream_job_id: upstream_source_id.as_share_source_job_id(),
200 downstream_fragment_id: self.fragment_id,
201 }];
202
203 assert!(
204 *upstream_is_no_shuffle,
205 "Upstream Cdc Source should be singleton. \
206 SourceBackfill is NoShuffle 1-1 correspondence. \
207 So they both should have only one upstream actor."
208 );
209
210 let upstream_fragment_id = upstream_fragment_id.as_global_id();
211
212 let input = vec![
214 StreamNode {
216 node_body: Some(NodeBody::Merge(Box::new({
217 MergeNode {
218 upstream_fragment_id,
219 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
220 ..Default::default()
221 }
222 }))),
223 ..merge_node.clone()
224 },
225 ];
226 Ok(StreamNode {
227 input,
228 ..stream_node.clone()
229 })
230 }
231
232 _ => {
234 let mut new_stream_node = stream_node.clone();
235 for (input, new_input) in stream_node
236 .input
237 .iter()
238 .zip_eq_fast(&mut new_stream_node.input)
239 {
240 *new_input = self.rewrite_inner(input, depth + 1)?;
241 }
242 Ok(new_stream_node)
243 }
244 }
245 }
246}
247
248impl ActorBuilder {
249 fn build(self, job: &StreamingJob, ctx: &StreamContext) -> MetaResult<StreamActor> {
251 #[cfg(not(debug_assertions))]
253 let mview_definition = job.name();
254 #[cfg(debug_assertions)]
255 let mview_definition = job.definition();
256
257 Ok(StreamActor {
258 actor_id: self.actor_id.as_global_id(),
259 fragment_id: self.fragment_id.as_global_id(),
260 vnode_bitmap: self.vnode_bitmap,
261 mview_definition,
262 expr_context: Some(ctx.to_expr_context()),
263 config_override: ctx.config_override.clone(),
264 })
265 }
266}
267
268#[derive(Default)]
273struct UpstreamFragmentChange {
274 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
276}
277
278#[derive(Default)]
279struct DownstreamFragmentChange {
280 new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
283}
284
285impl UpstreamFragmentChange {
286 fn add_dispatcher(
288 &mut self,
289 downstream_fragment_id: GlobalFragmentId,
290 dispatch: DispatchStrategy,
291 ) {
292 self.new_downstreams
293 .try_insert(downstream_fragment_id, dispatch)
294 .unwrap();
295 }
296}
297
298impl DownstreamFragmentChange {
299 fn add_upstream(
301 &mut self,
302 edge_id: DownstreamExternalEdgeId,
303 new_upstream_fragment_id: GlobalFragmentId,
304 ) {
305 self.new_upstreams
306 .try_insert(edge_id, new_upstream_fragment_id)
307 .unwrap();
308 }
309}
310
311type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
313
314#[derive(Debug)]
315struct FragmentActorBuilder {
316 fragment_id: GlobalFragmentId,
317 node: StreamNode,
318 actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
319 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
320 upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
322}
323
324impl FragmentActorBuilder {
325 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
326 Self {
327 fragment_id,
328 node,
329 actor_builders: Default::default(),
330 downstreams: Default::default(),
331 upstreams: Default::default(),
332 }
333 }
334}
335
336#[derive(Default)]
343struct ActorGraphBuildStateInner {
344 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
346
347 building_locations: ActorLocations,
349
350 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
353
354 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
357
358 external_locations: ActorLocations,
360}
361
362struct FragmentLinkNode<'a> {
364 fragment_id: GlobalFragmentId,
365 actor_ids: &'a [GlobalActorId],
366}
367
368impl ActorGraphBuildStateInner {
369 fn add_actor(
373 &mut self,
374 (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
375 actor_alignment_id: ActorAlignmentId,
376 vnode_bitmap: Option<Bitmap>,
377 ) {
378 self.fragment_actor_builders
379 .get_mut(&fragment_id)
380 .expect("should added previously")
381 .actor_builders
382 .try_insert(
383 actor_id,
384 ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
385 )
386 .unwrap();
387
388 self.building_locations
389 .try_insert(actor_id, actor_alignment_id)
390 .unwrap();
391 }
392
393 fn record_external_location(
395 &mut self,
396 actor_id: GlobalActorId,
397 actor_alignment_id: ActorAlignmentId,
398 ) {
399 self.external_locations
400 .try_insert(actor_id, actor_alignment_id)
401 .unwrap();
402 }
403
404 fn add_dispatcher(
409 &mut self,
410 fragment_id: GlobalFragmentId,
411 downstream_fragment_id: GlobalFragmentId,
412 dispatch: DispatchStrategy,
413 ) {
414 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
415 builder
416 .downstreams
417 .try_insert(downstream_fragment_id, dispatch)
418 .unwrap();
419 } else {
420 self.upstream_fragment_changes
421 .entry(fragment_id)
422 .or_default()
423 .add_dispatcher(downstream_fragment_id, dispatch);
424 }
425 }
426
427 fn add_upstream(
432 &mut self,
433 fragment_id: GlobalFragmentId,
434 edge_id: EdgeId,
435 upstream_fragment_id: GlobalFragmentId,
436 is_no_shuffle: bool,
437 ) {
438 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
439 builder
440 .upstreams
441 .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
442 .unwrap();
443 } else {
444 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
445 unreachable!("edge from internal to external must be `DownstreamExternal`")
446 };
447 self.downstream_fragment_changes
448 .entry(fragment_id)
449 .or_default()
450 .add_upstream(edge_id, upstream_fragment_id);
451 }
452 }
453
454 fn add_link<'a>(
463 &mut self,
464 upstream: FragmentLinkNode<'a>,
465 downstream: FragmentLinkNode<'a>,
466 edge: &'a StreamFragmentEdge,
467 ) {
468 let dt = edge.dispatch_strategy.r#type();
469
470 match dt {
471 DispatcherType::NoShuffle => {
473 assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
474
475 self.add_dispatcher(
477 upstream.fragment_id,
478 downstream.fragment_id,
479 edge.dispatch_strategy.clone(),
480 );
481
482 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
484 }
485
486 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
488 self.add_dispatcher(
489 upstream.fragment_id,
490 downstream.fragment_id,
491 edge.dispatch_strategy.clone(),
492 );
493 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
494 }
495
496 DispatcherType::Unspecified => unreachable!(),
497 }
498 }
499}
500
501struct ActorGraphBuildState {
503 inner: ActorGraphBuildStateInner,
505
506 fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
508
509 next_local_id: u32,
511
512 actor_id_gen: GlobalActorIdGen,
514}
515
516impl ActorGraphBuildState {
517 fn new(actor_id_gen: GlobalActorIdGen) -> Self {
519 Self {
520 inner: Default::default(),
521 fragment_actors: Default::default(),
522 next_local_id: 0,
523 actor_id_gen,
524 }
525 }
526
527 fn next_actor_id(&mut self) -> GlobalActorId {
529 let local_id = self.next_local_id;
530 self.next_local_id += 1;
531
532 self.actor_id_gen.to_global_id(local_id)
533 }
534
535 fn finish(self) -> ActorGraphBuildStateInner {
537 assert_eq!(self.actor_id_gen.len(), self.next_local_id);
539
540 self.inner
541 }
542}
543
544pub struct ActorGraphBuildResult {
547 pub graph: BTreeMap<FragmentId, Fragment>,
549 pub downstream_fragment_relations: FragmentDownstreamRelation,
552
553 pub building_locations: Locations,
555
556 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
558
559 pub replace_upstream: FragmentReplaceUpstream,
562}
563
564#[derive(Debug)]
567pub struct ActorGraphBuilder {
568 distributions: HashMap<GlobalFragmentId, Distribution>,
570
571 existing_distributions: HashMap<GlobalFragmentId, Distribution>,
573
574 fragment_graph: CompleteStreamFragmentGraph,
576
577 cluster_info: StreamingClusterInfo,
579}
580
581impl ActorGraphBuilder {
582 pub fn new(
585 streaming_job_id: JobId,
586 resource_group: String,
587 fragment_graph: CompleteStreamFragmentGraph,
588 cluster_info: StreamingClusterInfo,
589 default_parallelism: NonZeroUsize,
590 ) -> MetaResult<Self> {
591 let expected_vnode_count = fragment_graph.max_parallelism();
592 let existing_distributions = fragment_graph.existing_distribution();
593
594 let schedulable_workers = cluster_info.filter_workers_by_resource_group(&resource_group);
595
596 let scheduler = schedule::Scheduler::new(
597 streaming_job_id,
598 &schedulable_workers,
599 default_parallelism,
600 expected_vnode_count,
601 )?;
602
603 let distributions = scheduler.schedule(&fragment_graph)?;
604
605 let mut fragment_graph = fragment_graph;
607 for (id, fragment) in fragment_graph.building_fragments_mut() {
608 let mut error = None;
609 let fragment_vnode_count = distributions[id].vnode_count();
610 visit_tables(fragment, |table, _| {
611 if error.is_some() {
612 return;
613 }
614 let vnode_count = if table.is_singleton() {
617 if fragment_vnode_count > 1 {
618 tracing::info!(
619 table.name,
620 "found singleton table in hash-distributed fragment"
621 );
622 }
623 1
624 } else {
625 fragment_vnode_count
626 };
627 match table.vnode_count_inner().value_opt() {
628 Some(required_vnode_count) if required_vnode_count != vnode_count => {
635 error = Some(format!(
636 "failed to align vnode count for table {}({}): required {}, but got {}",
637 table.id, table.name, required_vnode_count, vnode_count
638 ));
639 }
640 _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
642 }
643 });
644 if let Some(error) = error {
645 bail!(error);
646 }
647 }
648
649 Ok(Self {
650 distributions,
651 existing_distributions,
652 fragment_graph,
653 cluster_info,
654 })
655 }
656
657 fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
660 self.distributions
661 .get(&fragment_id)
662 .or_else(|| self.existing_distributions.get(&fragment_id))
663 .unwrap()
664 }
665
666 fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
668 let actor_locations = actor_locations
669 .into_iter()
670 .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
671 .collect();
672
673 let worker_locations = self
674 .cluster_info
675 .worker_nodes
676 .iter()
677 .map(|(id, node)| (*id, node.clone()))
678 .collect();
679
680 Locations {
681 actor_locations,
682 worker_locations,
683 }
684 }
685
686 pub fn generate_graph(
689 self,
690 env: &MetaSrvEnv,
691 job: &StreamingJob,
692 ctx: StreamContext,
693 ) -> MetaResult<ActorGraphBuildResult> {
694 let actor_len = self
696 .distributions
697 .values()
698 .map(|d| d.parallelism())
699 .sum::<usize>() as u64;
700 let id_gen = GlobalActorIdGen::new(env.actor_id_generator(), actor_len);
701
702 let ActorGraphBuildStateInner {
704 fragment_actor_builders,
705 building_locations,
706 downstream_fragment_changes,
707 upstream_fragment_changes,
708 ..
709 } = self.build_actor_graph(id_gen)?;
710
711 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
712 let graph = {
714 let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
715 HashMap::new();
716
717 for (fragment_id, builder) in fragment_actor_builders {
720 let global_fragment_id = fragment_id.as_global_id();
721 let node = builder.rewrite()?;
722 downstream_fragment_relations
723 .try_insert(
724 global_fragment_id,
725 builder
726 .downstreams
727 .into_iter()
728 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
729 .collect(),
730 )
731 .expect("non-duplicate");
732 fragment_actors
733 .try_insert(
734 fragment_id,
735 (
736 node,
737 builder
738 .actor_builders
739 .into_values()
740 .map(|builder| builder.build(job, &ctx))
741 .try_collect()?,
742 ),
743 )
744 .expect("non-duplicate");
745 }
746
747 {
748 fragment_actors
749 .into_iter()
750 .map(|(fragment_id, (stream_node, actors))| {
751 let distribution = self.distributions[&fragment_id].clone();
752 let fragment = self.fragment_graph.seal_fragment(
753 fragment_id,
754 actors,
755 distribution,
756 stream_node,
757 );
758 let fragment_id = fragment_id.as_global_id();
759 (fragment_id, fragment)
760 })
761 .collect()
762 }
763 };
764
765 let building_locations = self.build_locations(building_locations);
767
768 let upstream_fragment_downstreams = upstream_fragment_changes
770 .into_iter()
771 .map(|(fragment_id, changes)| {
772 (
773 fragment_id.as_global_id(),
774 changes
775 .new_downstreams
776 .into_iter()
777 .map(|(downstream_fragment_id, new_dispatch)| {
778 (downstream_fragment_id.as_global_id(), new_dispatch).into()
779 })
780 .collect(),
781 )
782 })
783 .collect();
784
785 let replace_upstream = downstream_fragment_changes
787 .into_iter()
788 .map(|(fragment_id, changes)| {
789 let fragment_id = fragment_id.as_global_id();
790 (
791 fragment_id,
792 changes
793 .new_upstreams
794 .into_iter()
795 .map(move |(edge_id, upstream_fragment_id)| {
796 let upstream_fragment_id = upstream_fragment_id.as_global_id();
797 let DownstreamExternalEdgeId {
798 original_upstream_fragment_id,
799 ..
800 } = edge_id;
801 (
802 original_upstream_fragment_id.as_global_id(),
803 upstream_fragment_id,
804 )
805 })
806 .collect(),
807 )
808 })
809 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
810 .collect();
811
812 Ok(ActorGraphBuildResult {
813 graph,
814 downstream_fragment_relations,
815 building_locations,
816 upstream_fragment_downstreams,
817 replace_upstream,
818 })
819 }
820
821 fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
823 let mut state = ActorGraphBuildState::new(id_gen);
824
825 for fragment_id in self.fragment_graph.topo_order()? {
828 self.build_actor_graph_fragment(fragment_id, &mut state)?;
829 }
830
831 Ok(state.finish())
832 }
833
834 fn build_actor_graph_fragment(
836 &self,
837 fragment_id: GlobalFragmentId,
838 state: &mut ActorGraphBuildState,
839 ) -> MetaResult<()> {
840 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
841 let distribution = self.get_distribution(fragment_id);
842
843 let actor_ids = match current_fragment {
845 EitherFragment::Building(current_fragment) => {
847 let node = current_fragment.node.clone().unwrap();
848 state
849 .inner
850 .fragment_actor_builders
851 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
852 .expect("non-duplicate");
853 let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
854
855 distribution
856 .actors()
857 .map(|alignment_id| {
858 let actor_id = state.next_actor_id();
859 let vnode_bitmap = bitmaps
860 .as_ref()
861 .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
862 .cloned();
863
864 state
865 .inner
866 .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
867
868 actor_id
869 })
870 .collect_vec()
871 }
872
873 EitherFragment::Existing(existing_fragment) => existing_fragment
875 .actors
876 .iter()
877 .map(|(actor_id, actor_info)| {
878 let actor_id = GlobalActorId::new(*actor_id);
879 let alignment_id = match &distribution {
880 Distribution::Singleton(worker_id) => {
881 ActorAlignmentId::new_single(*worker_id)
882 }
883 Distribution::Hash(mapping) => mapping
884 .get_matched(actor_info.vnode_bitmap.as_ref().unwrap())
885 .unwrap(),
886 };
887
888 state.inner.record_external_location(actor_id, alignment_id);
889
890 actor_id
891 })
892 .collect_vec(),
893 };
894
895 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
897 let downstream_actors = state
898 .fragment_actors
899 .get(&downstream_fragment_id)
900 .expect("downstream fragment not processed yet");
901
902 state.inner.add_link(
903 FragmentLinkNode {
904 fragment_id,
905 actor_ids: &actor_ids,
906 },
907 FragmentLinkNode {
908 fragment_id: downstream_fragment_id,
909 actor_ids: downstream_actors,
910 },
911 edge,
912 );
913 }
914
915 state
917 .fragment_actors
918 .try_insert(fragment_id, actor_ids)
919 .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
920
921 Ok(())
922 }
923}