1use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use risingwave_common::bail;
20use risingwave_common::hash::{IsSingleton, VnodeCount, VnodeCountCompat};
21use risingwave_common::id::JobId;
22use risingwave_common::util::iter_util::ZipEqFast;
23use risingwave_common::util::stream_graph_visitor::visit_tables;
24use risingwave_pb::stream_plan::stream_node::NodeBody;
25use risingwave_pb::stream_plan::{
26 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
27};
28
29use crate::MetaResult;
30use crate::model::{Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream};
31use crate::stream::stream_graph::fragment::{
32 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
33 StreamFragmentEdge,
34};
35use crate::stream::stream_graph::id::GlobalFragmentId;
36use crate::stream::stream_graph::schedule;
37use crate::stream::stream_graph::schedule::Distribution;
38
39impl FragmentActorBuilder {
40 fn rewrite(&self) -> MetaResult<StreamNode> {
46 self.rewrite_inner(&self.node, 0)
47 }
48
49 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
50 match stream_node.get_node_body()? {
51 NodeBody::Exchange(exchange) => {
53 if depth == 0 {
56 bail!(
57 "there should be no ExchangeNode on the top of the plan node: {:#?}",
58 stream_node
59 )
60 }
61 assert!(!stream_node.get_fields().is_empty());
62 assert!(stream_node.input.is_empty());
63
64 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
66 link_id: stream_node.get_operator_id().as_raw_id(),
67 }];
68
69 let upstream_fragment_id = upstream_fragment_id.as_global_id();
70
71 Ok(StreamNode {
72 node_body: Some(NodeBody::Merge(Box::new({
73 MergeNode {
74 upstream_fragment_id,
75 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
76 ..Default::default()
77 }
78 }))),
79 identity: "MergeExecutor".to_owned(),
80 ..stream_node.clone()
81 })
82 }
83
84 NodeBody::StreamScan(stream_scan) => {
86 let input = stream_node.get_input();
87 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
88 assert!(input.is_empty());
91 return Ok(stream_node.clone());
92 }
93 assert_eq!(input.len(), 2);
94
95 let merge_node = &input[0];
96 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
97 let batch_plan_node = &input[1];
98 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
99
100 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
102 [&EdgeId::UpstreamExternal {
103 upstream_job_id: stream_scan.table_id.as_job_id(),
104 downstream_fragment_id: self.fragment_id,
105 }];
106
107 let is_shuffled_backfill = stream_scan.stream_scan_type
108 == StreamScanType::ArrangementBackfill as i32
109 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
110 if !is_shuffled_backfill {
111 assert!(*upstream_no_shuffle_actor);
112 }
113
114 let upstream_dispatcher_type = if is_shuffled_backfill {
115 DispatcherType::Hash as _
118 } else {
119 DispatcherType::NoShuffle as _
120 };
121
122 let upstream_fragment_id = upstream_fragment_id.as_global_id();
123
124 let input = vec![
125 StreamNode {
127 node_body: Some(NodeBody::Merge(Box::new({
128 MergeNode {
129 upstream_fragment_id,
130 upstream_dispatcher_type,
131 ..Default::default()
132 }
133 }))),
134 ..merge_node.clone()
135 },
136 batch_plan_node.clone(),
137 ];
138
139 Ok(StreamNode {
140 input,
141 ..stream_node.clone()
142 })
143 }
144
145 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
149 let input = stream_node.get_input();
150 assert_eq!(input.len(), 1);
151
152 let merge_node = &input[0];
153 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
154
155 let upstream_source_id = match stream_node.get_node_body()? {
156 NodeBody::CdcFilter(node) => node.upstream_source_id,
157 NodeBody::SourceBackfill(node) => node.upstream_source_id,
158 _ => unreachable!(),
159 };
160
161 let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
163 [&EdgeId::UpstreamExternal {
164 upstream_job_id: upstream_source_id.as_share_source_job_id(),
165 downstream_fragment_id: self.fragment_id,
166 }];
167
168 assert!(
169 *upstream_is_no_shuffle,
170 "Upstream Cdc Source should be singleton. \
171 SourceBackfill is NoShuffle 1-1 correspondence. \
172 So they both should have only one upstream actor."
173 );
174
175 let upstream_fragment_id = upstream_fragment_id.as_global_id();
176
177 let input = vec![
179 StreamNode {
181 node_body: Some(NodeBody::Merge(Box::new({
182 MergeNode {
183 upstream_fragment_id,
184 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
185 ..Default::default()
186 }
187 }))),
188 ..merge_node.clone()
189 },
190 ];
191 Ok(StreamNode {
192 input,
193 ..stream_node.clone()
194 })
195 }
196
197 _ => {
199 let mut new_stream_node = stream_node.clone();
200 for (input, new_input) in stream_node
201 .input
202 .iter()
203 .zip_eq_fast(&mut new_stream_node.input)
204 {
205 *new_input = self.rewrite_inner(input, depth + 1)?;
206 }
207 Ok(new_stream_node)
208 }
209 }
210 }
211}
212
213#[derive(Default)]
218struct UpstreamFragmentChange {
219 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
221}
222
223#[derive(Default)]
224struct DownstreamFragmentChange {
225 new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
228}
229
230impl UpstreamFragmentChange {
231 fn add_dispatcher(
233 &mut self,
234 downstream_fragment_id: GlobalFragmentId,
235 dispatch: DispatchStrategy,
236 ) {
237 self.new_downstreams
238 .try_insert(downstream_fragment_id, dispatch)
239 .unwrap();
240 }
241}
242
243impl DownstreamFragmentChange {
244 fn add_upstream(
246 &mut self,
247 edge_id: DownstreamExternalEdgeId,
248 new_upstream_fragment_id: GlobalFragmentId,
249 ) {
250 self.new_upstreams
251 .try_insert(edge_id, new_upstream_fragment_id)
252 .unwrap();
253 }
254}
255
256#[derive(Debug)]
257struct FragmentActorBuilder {
258 fragment_id: GlobalFragmentId,
259 node: StreamNode,
260 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
261 upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
263}
264
265impl FragmentActorBuilder {
266 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
267 Self {
268 fragment_id,
269 node,
270 downstreams: Default::default(),
271 upstreams: Default::default(),
272 }
273 }
274}
275
276#[derive(Default)]
283struct ActorGraphBuildStateInner {
284 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
286
287 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
290
291 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
294}
295
296struct FragmentLinkNode {
298 fragment_id: GlobalFragmentId,
299}
300
301impl ActorGraphBuildStateInner {
302 fn add_dispatcher(
307 &mut self,
308 fragment_id: GlobalFragmentId,
309 downstream_fragment_id: GlobalFragmentId,
310 dispatch: DispatchStrategy,
311 ) {
312 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
313 builder
314 .downstreams
315 .try_insert(downstream_fragment_id, dispatch)
316 .unwrap();
317 } else {
318 self.upstream_fragment_changes
319 .entry(fragment_id)
320 .or_default()
321 .add_dispatcher(downstream_fragment_id, dispatch);
322 }
323 }
324
325 fn add_upstream(
330 &mut self,
331 fragment_id: GlobalFragmentId,
332 edge_id: EdgeId,
333 upstream_fragment_id: GlobalFragmentId,
334 is_no_shuffle: bool,
335 ) {
336 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
337 builder
338 .upstreams
339 .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
340 .unwrap();
341 } else {
342 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
343 unreachable!("edge from internal to external must be `DownstreamExternal`")
344 };
345 self.downstream_fragment_changes
346 .entry(fragment_id)
347 .or_default()
348 .add_upstream(edge_id, upstream_fragment_id);
349 }
350 }
351
352 fn add_link(
361 &mut self,
362 upstream: FragmentLinkNode,
363 downstream: FragmentLinkNode,
364 edge: &StreamFragmentEdge,
365 ) {
366 let dt = edge.dispatch_strategy.r#type();
367
368 match dt {
369 DispatcherType::NoShuffle => {
371 self.add_dispatcher(
373 upstream.fragment_id,
374 downstream.fragment_id,
375 edge.dispatch_strategy.clone(),
376 );
377
378 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
380 }
381
382 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
384 self.add_dispatcher(
385 upstream.fragment_id,
386 downstream.fragment_id,
387 edge.dispatch_strategy.clone(),
388 );
389 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
390 }
391
392 DispatcherType::Unspecified => unreachable!(),
393 }
394 }
395}
396
397struct ActorGraphBuildState {
399 inner: ActorGraphBuildStateInner,
401}
402
403impl ActorGraphBuildState {
404 fn new() -> Self {
406 Self {
407 inner: Default::default(),
408 }
409 }
410
411 fn finish(self) -> ActorGraphBuildStateInner {
413 self.inner
414 }
415}
416
417pub struct ActorGraphBuildResult {
420 pub graph: BTreeMap<FragmentId, Fragment>,
422 pub downstream_fragment_relations: FragmentDownstreamRelation,
425
426 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
428
429 pub replace_upstream: FragmentReplaceUpstream,
432}
433
434#[derive(Debug)]
437pub struct ActorGraphBuilder {
438 distributions: HashMap<GlobalFragmentId, Distribution>,
440
441 fragment_graph: CompleteStreamFragmentGraph,
443}
444
445impl ActorGraphBuilder {
446 pub fn new(
449 streaming_job_id: JobId,
450 fragment_graph: CompleteStreamFragmentGraph,
451 default_parallelism: NonZeroUsize,
452 ) -> MetaResult<Self> {
453 let expected_vnode_count = fragment_graph.max_parallelism();
454
455 let scheduler =
456 schedule::Scheduler::new(streaming_job_id, default_parallelism, expected_vnode_count)?;
457
458 let distributions = scheduler.schedule(&fragment_graph)?;
459
460 let mut fragment_graph = fragment_graph;
462 for (id, fragment) in fragment_graph.building_fragments_mut() {
463 let mut error = None;
464 let fragment_vnode_count = distributions[id].vnode_count();
465 visit_tables(fragment, |table, _| {
466 if error.is_some() {
467 return;
468 }
469 let vnode_count = if table.is_singleton() {
472 if fragment_vnode_count > 1 {
473 tracing::info!(
474 table.name,
475 "found singleton table in hash-distributed fragment"
476 );
477 }
478 1
479 } else {
480 fragment_vnode_count
481 };
482 match table.vnode_count_inner().value_opt() {
483 Some(required_vnode_count) if required_vnode_count != vnode_count => {
490 error = Some(format!(
491 "failed to align vnode count for table {}({}): required {}, but got {}",
492 table.id, table.name, required_vnode_count, vnode_count
493 ));
494 }
495 _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
497 }
498 });
499 if let Some(error) = error {
500 bail!(error);
501 }
502 }
503
504 Ok(Self {
505 distributions,
506 fragment_graph,
507 })
508 }
509
510 pub fn generate_graph(self) -> MetaResult<ActorGraphBuildResult> {
513 let ActorGraphBuildStateInner {
515 fragment_actor_builders,
516 downstream_fragment_changes,
517 upstream_fragment_changes,
518 } = self.build_actor_graph()?;
519
520 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
521 let graph = {
523 let mut fragment_nodes: HashMap<GlobalFragmentId, StreamNode> = HashMap::new();
526
527 for (fragment_id, builder) in fragment_actor_builders {
528 let global_fragment_id = fragment_id.as_global_id();
529 let node = builder.rewrite()?;
530 downstream_fragment_relations
531 .try_insert(
532 global_fragment_id,
533 builder
534 .downstreams
535 .into_iter()
536 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
537 .collect(),
538 )
539 .expect("non-duplicate");
540 fragment_nodes
541 .try_insert(fragment_id, node)
542 .expect("non-duplicate");
543 }
544
545 let mut graph = BTreeMap::new();
546 for (fragment_id, stream_node) in fragment_nodes {
547 let distribution = self.distributions[&fragment_id].clone();
548 let fragment =
549 self.fragment_graph
550 .seal_fragment(fragment_id, distribution, stream_node);
551 let fragment_id = fragment_id.as_global_id();
552 graph.insert(fragment_id, fragment);
553 }
554 graph
555 };
556
557 let upstream_fragment_downstreams = upstream_fragment_changes
559 .into_iter()
560 .map(|(fragment_id, changes)| {
561 (
562 fragment_id.as_global_id(),
563 changes
564 .new_downstreams
565 .into_iter()
566 .map(|(downstream_fragment_id, new_dispatch)| {
567 (downstream_fragment_id.as_global_id(), new_dispatch).into()
568 })
569 .collect(),
570 )
571 })
572 .collect();
573
574 let replace_upstream = downstream_fragment_changes
576 .into_iter()
577 .map(|(fragment_id, changes)| {
578 let fragment_id = fragment_id.as_global_id();
579 (
580 fragment_id,
581 changes
582 .new_upstreams
583 .into_iter()
584 .map(move |(edge_id, upstream_fragment_id)| {
585 let upstream_fragment_id = upstream_fragment_id.as_global_id();
586 let DownstreamExternalEdgeId {
587 original_upstream_fragment_id,
588 ..
589 } = edge_id;
590 (
591 original_upstream_fragment_id.as_global_id(),
592 upstream_fragment_id,
593 )
594 })
595 .collect(),
596 )
597 })
598 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
599 .collect();
600
601 Ok(ActorGraphBuildResult {
602 graph,
603 downstream_fragment_relations,
604 upstream_fragment_downstreams,
605 replace_upstream,
606 })
607 }
608
609 fn build_actor_graph(&self) -> MetaResult<ActorGraphBuildStateInner> {
611 let mut state = ActorGraphBuildState::new();
612
613 for fragment_id in self.fragment_graph.topo_order()? {
616 self.build_actor_graph_fragment(fragment_id, &mut state)?;
617 }
618
619 Ok(state.finish())
620 }
621
622 fn build_actor_graph_fragment(
624 &self,
625 fragment_id: GlobalFragmentId,
626 state: &mut ActorGraphBuildState,
627 ) -> MetaResult<()> {
628 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
629
630 match current_fragment {
632 EitherFragment::Building(current_fragment) => {
634 let node = current_fragment.node.clone().unwrap();
635 state
636 .inner
637 .fragment_actor_builders
638 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
639 .expect("non-duplicate");
640 }
641
642 EitherFragment::Existing => {}
644 };
645
646 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
648 state.inner.add_link(
649 FragmentLinkNode { fragment_id },
650 FragmentLinkNode {
651 fragment_id: downstream_fragment_id,
652 },
653 edge,
654 );
655 }
656
657 Ok(())
658 }
659}