1use std::collections::{BTreeMap, HashMap};
16
17use assert_matches::assert_matches;
18use risingwave_common::bail;
19use risingwave_common::hash::{IsSingleton, VnodeCount, VnodeCountCompat};
20use risingwave_common::util::iter_util::ZipEqFast;
21use risingwave_common::util::stream_graph_visitor::visit_tables;
22use risingwave_pb::stream_plan::stream_node::NodeBody;
23use risingwave_pb::stream_plan::{
24 DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
25};
26
27use crate::MetaResult;
28use crate::model::{Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream};
29use crate::stream::stream_graph::fragment::{
30 CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
31 StreamFragmentEdge,
32};
33use crate::stream::stream_graph::id::GlobalFragmentId;
34use crate::stream::stream_graph::schedule;
35use crate::stream::stream_graph::schedule::Distribution;
36
37impl FragmentActorBuilder {
38 fn rewrite(&self) -> MetaResult<StreamNode> {
44 self.rewrite_inner(&self.node, 0)
45 }
46
47 fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
48 match stream_node.get_node_body()? {
49 NodeBody::Exchange(exchange) => {
51 if depth == 0 {
54 bail!(
55 "there should be no ExchangeNode on the top of the plan node: {:#?}",
56 stream_node
57 )
58 }
59 assert!(!stream_node.get_fields().is_empty());
60 assert!(stream_node.input.is_empty());
61
62 let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
64 link_id: stream_node.get_operator_id().as_raw_id(),
65 }];
66
67 let upstream_fragment_id = upstream_fragment_id.as_global_id();
68
69 Ok(StreamNode {
70 node_body: Some(NodeBody::Merge(Box::new({
71 MergeNode {
72 upstream_fragment_id,
73 upstream_dispatcher_type: exchange.get_strategy()?.r#type,
74 ..Default::default()
75 }
76 }))),
77 identity: "MergeExecutor".to_owned(),
78 ..stream_node.clone()
79 })
80 }
81
82 NodeBody::StreamScan(stream_scan) => {
84 let input = stream_node.get_input();
85 if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
86 assert!(input.is_empty());
89 return Ok(stream_node.clone());
90 }
91 assert_eq!(input.len(), 2);
92
93 let merge_node = &input[0];
94 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
95 let batch_plan_node = &input[1];
96 assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
97
98 let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
100 [&EdgeId::UpstreamExternal {
101 upstream_job_id: stream_scan.table_id.as_job_id(),
102 downstream_fragment_id: self.fragment_id,
103 }];
104
105 let is_shuffled_backfill = stream_scan.stream_scan_type
106 == StreamScanType::ArrangementBackfill as i32
107 || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
108 if !is_shuffled_backfill {
109 assert!(*upstream_no_shuffle_actor);
110 }
111
112 let upstream_dispatcher_type = if is_shuffled_backfill {
113 DispatcherType::Hash as _
116 } else {
117 DispatcherType::NoShuffle as _
118 };
119
120 let upstream_fragment_id = upstream_fragment_id.as_global_id();
121
122 let input = vec![
123 StreamNode {
125 node_body: Some(NodeBody::Merge(Box::new({
126 MergeNode {
127 upstream_fragment_id,
128 upstream_dispatcher_type,
129 ..Default::default()
130 }
131 }))),
132 ..merge_node.clone()
133 },
134 batch_plan_node.clone(),
135 ];
136
137 Ok(StreamNode {
138 input,
139 ..stream_node.clone()
140 })
141 }
142
143 NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
147 let input = stream_node.get_input();
148 assert_eq!(input.len(), 1);
149
150 let merge_node = &input[0];
151 assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
152
153 let upstream_source_id = match stream_node.get_node_body()? {
154 NodeBody::CdcFilter(node) => node.upstream_source_id,
155 NodeBody::SourceBackfill(node) => node.upstream_source_id,
156 _ => unreachable!(),
157 };
158
159 let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
161 [&EdgeId::UpstreamExternal {
162 upstream_job_id: upstream_source_id.as_share_source_job_id(),
163 downstream_fragment_id: self.fragment_id,
164 }];
165
166 assert!(
167 *upstream_is_no_shuffle,
168 "Upstream Cdc Source should be singleton. \
169 SourceBackfill is NoShuffle 1-1 correspondence. \
170 So they both should have only one upstream actor."
171 );
172
173 let upstream_fragment_id = upstream_fragment_id.as_global_id();
174
175 let input = vec![
177 StreamNode {
179 node_body: Some(NodeBody::Merge(Box::new({
180 MergeNode {
181 upstream_fragment_id,
182 upstream_dispatcher_type: DispatcherType::NoShuffle as _,
183 ..Default::default()
184 }
185 }))),
186 ..merge_node.clone()
187 },
188 ];
189 Ok(StreamNode {
190 input,
191 ..stream_node.clone()
192 })
193 }
194
195 _ => {
197 let mut new_stream_node = stream_node.clone();
198 for (input, new_input) in stream_node
199 .input
200 .iter()
201 .zip_eq_fast(&mut new_stream_node.input)
202 {
203 *new_input = self.rewrite_inner(input, depth + 1)?;
204 }
205 Ok(new_stream_node)
206 }
207 }
208 }
209}
210
211#[derive(Default)]
216struct UpstreamFragmentChange {
217 new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
219}
220
221#[derive(Default)]
222struct DownstreamFragmentChange {
223 new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
226}
227
228impl UpstreamFragmentChange {
229 fn add_dispatcher(
231 &mut self,
232 downstream_fragment_id: GlobalFragmentId,
233 dispatch: DispatchStrategy,
234 ) {
235 self.new_downstreams
236 .try_insert(downstream_fragment_id, dispatch)
237 .unwrap();
238 }
239}
240
241impl DownstreamFragmentChange {
242 fn add_upstream(
244 &mut self,
245 edge_id: DownstreamExternalEdgeId,
246 new_upstream_fragment_id: GlobalFragmentId,
247 ) {
248 self.new_upstreams
249 .try_insert(edge_id, new_upstream_fragment_id)
250 .unwrap();
251 }
252}
253
254#[derive(Debug)]
255struct FragmentActorBuilder {
256 fragment_id: GlobalFragmentId,
257 node: StreamNode,
258 downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
259 upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
261}
262
263impl FragmentActorBuilder {
264 fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
265 Self {
266 fragment_id,
267 node,
268 downstreams: Default::default(),
269 upstreams: Default::default(),
270 }
271 }
272}
273
274#[derive(Default)]
281struct ActorGraphBuildStateInner {
282 fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
284
285 downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
288
289 upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
292}
293
294struct FragmentLinkNode {
296 fragment_id: GlobalFragmentId,
297}
298
299impl ActorGraphBuildStateInner {
300 fn add_dispatcher(
305 &mut self,
306 fragment_id: GlobalFragmentId,
307 downstream_fragment_id: GlobalFragmentId,
308 dispatch: DispatchStrategy,
309 ) {
310 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
311 builder
312 .downstreams
313 .try_insert(downstream_fragment_id, dispatch)
314 .unwrap();
315 } else {
316 self.upstream_fragment_changes
317 .entry(fragment_id)
318 .or_default()
319 .add_dispatcher(downstream_fragment_id, dispatch);
320 }
321 }
322
323 fn add_upstream(
328 &mut self,
329 fragment_id: GlobalFragmentId,
330 edge_id: EdgeId,
331 upstream_fragment_id: GlobalFragmentId,
332 is_no_shuffle: bool,
333 ) {
334 if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
335 builder
336 .upstreams
337 .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
338 .unwrap();
339 } else {
340 let EdgeId::DownstreamExternal(edge_id) = edge_id else {
341 unreachable!("edge from internal to external must be `DownstreamExternal`")
342 };
343 self.downstream_fragment_changes
344 .entry(fragment_id)
345 .or_default()
346 .add_upstream(edge_id, upstream_fragment_id);
347 }
348 }
349
350 fn add_link(
359 &mut self,
360 upstream: FragmentLinkNode,
361 downstream: FragmentLinkNode,
362 edge: &StreamFragmentEdge,
363 ) {
364 let dt = edge.dispatch_strategy.r#type();
365
366 match dt {
367 DispatcherType::NoShuffle => {
369 self.add_dispatcher(
371 upstream.fragment_id,
372 downstream.fragment_id,
373 edge.dispatch_strategy.clone(),
374 );
375
376 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
378 }
379
380 DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
382 self.add_dispatcher(
383 upstream.fragment_id,
384 downstream.fragment_id,
385 edge.dispatch_strategy.clone(),
386 );
387 self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
388 }
389
390 DispatcherType::Unspecified => unreachable!(),
391 }
392 }
393}
394
395struct ActorGraphBuildState {
397 inner: ActorGraphBuildStateInner,
399}
400
401impl ActorGraphBuildState {
402 fn new() -> Self {
404 Self {
405 inner: Default::default(),
406 }
407 }
408
409 fn finish(self) -> ActorGraphBuildStateInner {
411 self.inner
412 }
413}
414
415pub struct ActorGraphBuildResult {
418 pub graph: BTreeMap<FragmentId, Fragment>,
420 pub downstream_fragment_relations: FragmentDownstreamRelation,
423
424 pub upstream_fragment_downstreams: FragmentDownstreamRelation,
426
427 pub replace_upstream: FragmentReplaceUpstream,
430}
431
432#[derive(Debug)]
435pub struct ActorGraphBuilder {
436 distributions: HashMap<GlobalFragmentId, Distribution>,
438
439 fragment_graph: CompleteStreamFragmentGraph,
441}
442
443impl ActorGraphBuilder {
444 pub fn new(fragment_graph: CompleteStreamFragmentGraph) -> MetaResult<Self> {
447 let expected_vnode_count = fragment_graph.max_parallelism();
448 let scheduler = schedule::Scheduler::new(expected_vnode_count)?;
449
450 let distributions = scheduler.schedule(&fragment_graph)?;
451
452 let mut fragment_graph = fragment_graph;
454 for (id, fragment) in fragment_graph.building_fragments_mut() {
455 let mut error = None;
456 let fragment_vnode_count = distributions[id].vnode_count();
457 visit_tables(fragment, |table, _| {
458 if error.is_some() {
459 return;
460 }
461 let vnode_count = if table.is_singleton() {
464 if fragment_vnode_count > 1 {
465 tracing::info!(
466 table.name,
467 "found singleton table in hash-distributed fragment"
468 );
469 }
470 1
471 } else {
472 fragment_vnode_count
473 };
474 match table.vnode_count_inner().value_opt() {
475 Some(required_vnode_count) if required_vnode_count != vnode_count => {
482 error = Some(format!(
483 "failed to align vnode count for table {}({}): required {}, but got {}",
484 table.id, table.name, required_vnode_count, vnode_count
485 ));
486 }
487 _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
489 }
490 });
491 if let Some(error) = error {
492 bail!(error);
493 }
494 }
495
496 Ok(Self {
497 distributions,
498 fragment_graph,
499 })
500 }
501
502 pub fn generate_graph(self) -> MetaResult<ActorGraphBuildResult> {
505 let ActorGraphBuildStateInner {
507 fragment_actor_builders,
508 downstream_fragment_changes,
509 upstream_fragment_changes,
510 } = self.build_actor_graph()?;
511
512 let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
513 let graph = {
515 let mut fragment_nodes: HashMap<GlobalFragmentId, StreamNode> = HashMap::new();
518
519 for (fragment_id, builder) in fragment_actor_builders {
520 let global_fragment_id = fragment_id.as_global_id();
521 let node = builder.rewrite()?;
522 downstream_fragment_relations
523 .try_insert(
524 global_fragment_id,
525 builder
526 .downstreams
527 .into_iter()
528 .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
529 .collect(),
530 )
531 .expect("non-duplicate");
532 fragment_nodes
533 .try_insert(fragment_id, node)
534 .expect("non-duplicate");
535 }
536
537 let mut graph = BTreeMap::new();
538 for (fragment_id, stream_node) in fragment_nodes {
539 let distribution = self.distributions[&fragment_id].clone();
540 let fragment =
541 self.fragment_graph
542 .seal_fragment(fragment_id, distribution, stream_node);
543 let fragment_id = fragment_id.as_global_id();
544 graph.insert(fragment_id, fragment);
545 }
546 graph
547 };
548
549 let upstream_fragment_downstreams = upstream_fragment_changes
551 .into_iter()
552 .map(|(fragment_id, changes)| {
553 (
554 fragment_id.as_global_id(),
555 changes
556 .new_downstreams
557 .into_iter()
558 .map(|(downstream_fragment_id, new_dispatch)| {
559 (downstream_fragment_id.as_global_id(), new_dispatch).into()
560 })
561 .collect(),
562 )
563 })
564 .collect();
565
566 let replace_upstream = downstream_fragment_changes
568 .into_iter()
569 .map(|(fragment_id, changes)| {
570 let fragment_id = fragment_id.as_global_id();
571 (
572 fragment_id,
573 changes
574 .new_upstreams
575 .into_iter()
576 .map(move |(edge_id, upstream_fragment_id)| {
577 let upstream_fragment_id = upstream_fragment_id.as_global_id();
578 let DownstreamExternalEdgeId {
579 original_upstream_fragment_id,
580 ..
581 } = edge_id;
582 (
583 original_upstream_fragment_id.as_global_id(),
584 upstream_fragment_id,
585 )
586 })
587 .collect(),
588 )
589 })
590 .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
591 .collect();
592
593 Ok(ActorGraphBuildResult {
594 graph,
595 downstream_fragment_relations,
596 upstream_fragment_downstreams,
597 replace_upstream,
598 })
599 }
600
601 fn build_actor_graph(&self) -> MetaResult<ActorGraphBuildStateInner> {
603 let mut state = ActorGraphBuildState::new();
604
605 for fragment_id in self.fragment_graph.topo_order()? {
608 self.build_actor_graph_fragment(fragment_id, &mut state)?;
609 }
610
611 Ok(state.finish())
612 }
613
614 fn build_actor_graph_fragment(
616 &self,
617 fragment_id: GlobalFragmentId,
618 state: &mut ActorGraphBuildState,
619 ) -> MetaResult<()> {
620 let current_fragment = self.fragment_graph.get_fragment(fragment_id);
621
622 match current_fragment {
624 EitherFragment::Building(current_fragment) => {
626 let node = current_fragment.node.clone().unwrap();
627 state
628 .inner
629 .fragment_actor_builders
630 .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
631 .expect("non-duplicate");
632 }
633
634 EitherFragment::Existing => {}
636 };
637
638 for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
640 state.inner.add_link(
641 FragmentLinkNode { fragment_id },
642 FragmentLinkNode {
643 fragment_id: downstream_fragment_id,
644 },
645 edge,
646 );
647 }
648
649 Ok(())
650 }
651}