risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{IsSingleton, VnodeCount, WorkerSlotId};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37    Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38    FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42    StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48/// [`ActorBuilder`] builds a stream actor in a stream DAG.
49#[derive(Debug)]
50struct ActorBuilder {
51    /// The ID of this actor.
52    actor_id: GlobalActorId,
53
54    /// The fragment ID of this actor.
55    fragment_id: GlobalFragmentId,
56
57    /// The virtual node bitmap, if this fragment is hash distributed.
58    vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62    fn new(
63        actor_id: GlobalActorId,
64        fragment_id: GlobalFragmentId,
65        vnode_bitmap: Option<Bitmap>,
66    ) -> Self {
67        Self {
68            actor_id,
69            fragment_id,
70            vnode_bitmap,
71        }
72    }
73}
74
75impl FragmentActorBuilder {
76    /// Rewrite the actor body.
77    ///
78    /// During this process, the following things will be done:
79    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
80    ///    compute nodes.
81    fn rewrite(&self) -> MetaResult<StreamNode> {
82        self.rewrite_inner(&self.node, 0)
83    }
84
85    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86        match stream_node.get_node_body()? {
87            // Leaf node `Exchange`.
88            NodeBody::Exchange(exchange) => {
89                // The exchange node should always be the bottom of the plan node. If we find one
90                // when the depth is 0, it means that the plan node is not well-formed.
91                if depth == 0 {
92                    bail!(
93                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
94                        stream_node
95                    )
96                }
97                assert!(!stream_node.get_fields().is_empty());
98                assert!(stream_node.input.is_empty());
99
100                // Index the upstreams by the an internal edge ID.
101                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102                    link_id: stream_node.get_operator_id(),
103                }];
104
105                let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107                Ok(StreamNode {
108                    node_body: Some(NodeBody::Merge(Box::new({
109                        #[expect(deprecated)]
110                        MergeNode {
111                            upstream_actor_id: vec![],
112                            upstream_fragment_id,
113                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
114                            fields: stream_node.get_fields().clone(),
115                        }
116                    }))),
117                    identity: "MergeExecutor".to_owned(),
118                    ..stream_node.clone()
119                })
120            }
121
122            // "Leaf" node `StreamScan`.
123            NodeBody::StreamScan(stream_scan) => {
124                let input = stream_node.get_input();
125                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
126                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
127                    // and always reads from log store.
128                    assert!(input.is_empty());
129                    return Ok(stream_node.clone());
130                }
131                assert_eq!(input.len(), 2);
132
133                let merge_node = &input[0];
134                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
135                let batch_plan_node = &input[1];
136                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
137
138                // Index the upstreams by the an external edge ID.
139                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
140                    [&EdgeId::UpstreamExternal {
141                        upstream_table_id: stream_scan.table_id.into(),
142                        downstream_fragment_id: self.fragment_id,
143                    }];
144
145                let is_shuffled_backfill = stream_scan.stream_scan_type
146                    == StreamScanType::ArrangementBackfill as i32
147                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
148                if !is_shuffled_backfill {
149                    assert!(upstream_no_shuffle_actor.is_some());
150                }
151
152                let upstream_dispatcher_type = if is_shuffled_backfill {
153                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
154                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
155                    DispatcherType::Hash as _
156                } else {
157                    DispatcherType::NoShuffle as _
158                };
159
160                let upstream_fragment_id = upstream_fragment_id.as_global_id();
161
162                let input = vec![
163                    // Fill the merge node body with correct upstream info.
164                    StreamNode {
165                        node_body: Some(NodeBody::Merge(Box::new({
166                            #[expect(deprecated)]
167                            MergeNode {
168                                upstream_actor_id: vec![],
169                                upstream_fragment_id,
170                                upstream_dispatcher_type,
171                                fields: merge_node.fields.clone(),
172                            }
173                        }))),
174                        ..merge_node.clone()
175                    },
176                    batch_plan_node.clone(),
177                ];
178
179                Ok(StreamNode {
180                    input,
181                    ..stream_node.clone()
182                })
183            }
184
185            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
186            // cdc_filter -> backfill -> mview
187            // source_backfill -> mview
188            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
189                let input = stream_node.get_input();
190                assert_eq!(input.len(), 1);
191
192                let merge_node = &input[0];
193                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
194
195                let upstream_source_id = match stream_node.get_node_body()? {
196                    NodeBody::CdcFilter(node) => node.upstream_source_id,
197                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
198                    _ => unreachable!(),
199                };
200
201                // Index the upstreams by the an external edge ID.
202                let (upstream_fragment_id, upstream_actors) = &self.upstreams
203                    [&EdgeId::UpstreamExternal {
204                        upstream_table_id: upstream_source_id.into(),
205                        downstream_fragment_id: self.fragment_id,
206                    }];
207
208                assert!(
209                    upstream_actors.is_some(),
210                    "Upstream Cdc Source should be singleton. \
211                    SourceBackfill is NoShuffle 1-1 correspondence. \
212                    So they both should have only one upstream actor."
213                );
214
215                let upstream_fragment_id = upstream_fragment_id.as_global_id();
216
217                // rewrite the input
218                let input = vec![
219                    // Fill the merge node body with correct upstream info.
220                    StreamNode {
221                        node_body: Some(NodeBody::Merge(Box::new({
222                            #[expect(deprecated)]
223                            MergeNode {
224                                upstream_actor_id: vec![],
225                                upstream_fragment_id,
226                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
227                                fields: merge_node.fields.clone(),
228                            }
229                        }))),
230                        ..merge_node.clone()
231                    },
232                ];
233                Ok(StreamNode {
234                    input,
235                    ..stream_node.clone()
236                })
237            }
238
239            // For other nodes, visit the children recursively.
240            _ => {
241                let mut new_stream_node = stream_node.clone();
242                for (input, new_input) in stream_node
243                    .input
244                    .iter()
245                    .zip_eq_fast(&mut new_stream_node.input)
246                {
247                    *new_input = self.rewrite_inner(input, depth + 1)?;
248                }
249                Ok(new_stream_node)
250            }
251        }
252    }
253}
254
255impl ActorBuilder {
256    /// Build an actor after all the upstreams and downstreams are processed.
257    fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
258        // Only fill the definition when debug assertions enabled, otherwise use name instead.
259        #[cfg(not(debug_assertions))]
260        let mview_definition = job.name();
261        #[cfg(debug_assertions)]
262        let mview_definition = job.definition();
263
264        Ok(StreamActor {
265            actor_id: self.actor_id.as_global_id(),
266            fragment_id: self.fragment_id.as_global_id(),
267            vnode_bitmap: self.vnode_bitmap,
268            mview_definition,
269            expr_context: Some(expr_context),
270        })
271    }
272}
273
274/// The required changes to an existing external actor to build the graph of a streaming job.
275///
276/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
277/// to the upstream actors, by adding new dispatchers.
278#[derive(Default)]
279struct UpstreamFragmentChange {
280    /// The new downstreams to be added.
281    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
282}
283
284#[derive(Default)]
285struct DownstreamFragmentChange {
286    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
287    /// `edge_id` -> new upstream fragment id
288    new_upstreams:
289        HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
290}
291
292impl UpstreamFragmentChange {
293    /// Add a dispatcher to the external actor.
294    fn add_dispatcher(
295        &mut self,
296        downstream_fragment_id: GlobalFragmentId,
297        dispatch: DispatchStrategy,
298    ) {
299        self.new_downstreams
300            .try_insert(downstream_fragment_id, dispatch)
301            .unwrap();
302    }
303}
304
305impl DownstreamFragmentChange {
306    /// Add an upstream to the external actor.
307    fn add_upstream(
308        &mut self,
309        edge_id: DownstreamExternalEdgeId,
310        new_upstream_fragment_id: GlobalFragmentId,
311        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
312    ) {
313        self.new_upstreams
314            .try_insert(
315                edge_id,
316                (new_upstream_fragment_id, no_shuffle_actor_mapping),
317            )
318            .unwrap();
319    }
320}
321
322/// The worker slot location of actors.
323type ActorLocations = BTreeMap<GlobalActorId, WorkerSlotId>;
324// no_shuffle upstream actor_id -> actor_id
325type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
326
327#[derive(Debug)]
328struct FragmentActorBuilder {
329    fragment_id: GlobalFragmentId,
330    node: StreamNode,
331    actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
332    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
333    // edge_id -> (upstream fragment_id, no shuffle actor pairs if it's no shuffle dispatched)
334    upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
335}
336
337impl FragmentActorBuilder {
338    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
339        Self {
340            fragment_id,
341            node,
342            actor_builders: Default::default(),
343            downstreams: Default::default(),
344            upstreams: Default::default(),
345        }
346    }
347}
348
349/// The actual mutable state of building an actor graph.
350///
351/// When the fragments are visited in a topological order, actor builders will be added to this
352/// state and the scheduled locations will be added. As the building process is run on the
353/// **complete graph** which also contains the info of the existing (external) fragments, the info
354/// of them will be also recorded.
355#[derive(Default)]
356struct ActorGraphBuildStateInner {
357    /// The builders of the actors to be built.
358    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
359
360    /// The scheduled locations of the actors to be built.
361    building_locations: ActorLocations,
362
363    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
364    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
365    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
366
367    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
368    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
369    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
370
371    /// The actual locations of the external actors.
372    external_locations: ActorLocations,
373}
374
375/// The information of a fragment, used for parameter passing for `Inner::add_link`.
376struct FragmentLinkNode<'a> {
377    fragment_id: GlobalFragmentId,
378    actor_ids: &'a [GlobalActorId],
379}
380
381impl ActorGraphBuildStateInner {
382    /// Insert new generated actor and record its location.
383    ///
384    /// The `vnode_bitmap` should be `Some` for the actors of hash-distributed fragments.
385    fn add_actor(
386        &mut self,
387        (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
388        worker_slot_id: WorkerSlotId,
389        vnode_bitmap: Option<Bitmap>,
390    ) {
391        self.fragment_actor_builders
392            .get_mut(&fragment_id)
393            .expect("should added previously")
394            .actor_builders
395            .try_insert(
396                actor_id,
397                ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
398            )
399            .unwrap();
400
401        self.building_locations
402            .try_insert(actor_id, worker_slot_id)
403            .unwrap();
404    }
405
406    /// Record the location of an external actor.
407    fn record_external_location(&mut self, actor_id: GlobalActorId, worker_slot_id: WorkerSlotId) {
408        self.external_locations
409            .try_insert(actor_id, worker_slot_id)
410            .unwrap();
411    }
412
413    /// Add the new downstream fragment relation to a fragment.
414    ///
415    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
416    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
417    fn add_dispatcher(
418        &mut self,
419        fragment_id: GlobalFragmentId,
420        downstream_fragment_id: GlobalFragmentId,
421        dispatch: DispatchStrategy,
422    ) {
423        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
424            builder
425                .downstreams
426                .try_insert(downstream_fragment_id, dispatch)
427                .unwrap();
428        } else {
429            self.upstream_fragment_changes
430                .entry(fragment_id)
431                .or_default()
432                .add_dispatcher(downstream_fragment_id, dispatch);
433        }
434    }
435
436    /// Add the new upstream for an actor.
437    ///
438    /// - If the actor is to be built, the upstream will be added to the actor builder.
439    /// - If the actor is an external actor, the upstream will be added to the external changes.
440    fn add_upstream(
441        &mut self,
442        fragment_id: GlobalFragmentId,
443        edge_id: EdgeId,
444        upstream_fragment_id: GlobalFragmentId,
445        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
446    ) {
447        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
448            builder
449                .upstreams
450                .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
451                .unwrap();
452        } else {
453            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
454                unreachable!("edge from internal to external must be `DownstreamExternal`")
455            };
456            self.downstream_fragment_changes
457                .entry(fragment_id)
458                .or_default()
459                .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
460        }
461    }
462
463    /// Get the location of an actor. Will look up the location map of both the actors to be built
464    /// and the external actors.
465    fn get_location(&self, actor_id: GlobalActorId) -> WorkerSlotId {
466        self.building_locations
467            .get(&actor_id)
468            .copied()
469            .or_else(|| self.external_locations.get(&actor_id).copied())
470            .unwrap()
471    }
472
473    /// Add a "link" between two fragments in the graph.
474    ///
475    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
476    /// based on the distribution and the dispatch strategy. They will be
477    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
478    ///
479    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
480    /// instead of the actor builders.
481    fn add_link<'a>(
482        &mut self,
483        upstream: FragmentLinkNode<'a>,
484        downstream: FragmentLinkNode<'a>,
485        edge: &'a StreamFragmentEdge,
486    ) {
487        let dt = edge.dispatch_strategy.r#type();
488
489        match dt {
490            // For `NoShuffle`, make n "1-1" links between the actors.
491            DispatcherType::NoShuffle => {
492                assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
493                let upstream_locations: HashMap<_, _> = upstream
494                    .actor_ids
495                    .iter()
496                    .map(|id| (self.get_location(*id), *id))
497                    .collect();
498                let downstream_locations: HashMap<_, _> = downstream
499                    .actor_ids
500                    .iter()
501                    .map(|id| (self.get_location(*id), *id))
502                    .collect();
503
504                // Create a new dispatcher just between these two actors.
505                self.add_dispatcher(
506                    upstream.fragment_id,
507                    downstream.fragment_id,
508                    edge.dispatch_strategy.clone(),
509                );
510
511                // Also record the upstream for the downstream actor.
512                self.add_upstream(
513                    downstream.fragment_id,
514                    edge.id,
515                    upstream.fragment_id,
516                    Some(
517                        downstream_locations
518                            .iter()
519                            .map(|(location, downstream_actor_id)| {
520                                let upstream_actor_id = upstream_locations.get(location).unwrap();
521                                (*upstream_actor_id, *downstream_actor_id)
522                            })
523                            .collect(),
524                    ),
525                );
526            }
527
528            // Otherwise, make m * n links between the actors.
529            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
530                self.add_dispatcher(
531                    upstream.fragment_id,
532                    downstream.fragment_id,
533                    edge.dispatch_strategy.clone(),
534                );
535                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
536            }
537
538            DispatcherType::Unspecified => unreachable!(),
539        }
540    }
541}
542
543/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
544struct ActorGraphBuildState {
545    /// The actual state.
546    inner: ActorGraphBuildStateInner,
547
548    /// The actor IDs of each fragment.
549    fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
550
551    /// The next local actor id to use.
552    next_local_id: u32,
553
554    /// The global actor id generator.
555    actor_id_gen: GlobalActorIdGen,
556}
557
558impl ActorGraphBuildState {
559    /// Create an empty state with the given id generator.
560    fn new(actor_id_gen: GlobalActorIdGen) -> Self {
561        Self {
562            inner: Default::default(),
563            fragment_actors: Default::default(),
564            next_local_id: 0,
565            actor_id_gen,
566        }
567    }
568
569    /// Get the next global actor id.
570    fn next_actor_id(&mut self) -> GlobalActorId {
571        let local_id = self.next_local_id;
572        self.next_local_id += 1;
573
574        self.actor_id_gen.to_global_id(local_id)
575    }
576
577    /// Finish the build and return the inner state.
578    fn finish(self) -> ActorGraphBuildStateInner {
579        // Assert that all the actors are built.
580        assert_eq!(self.actor_id_gen.len(), self.next_local_id);
581
582        self.inner
583    }
584}
585
586/// The result of a built actor graph. Will be further embedded into the `Context` for building
587/// actors on the compute nodes.
588pub struct ActorGraphBuildResult {
589    /// The graph of sealed fragments, including all actors.
590    pub graph: BTreeMap<FragmentId, Fragment>,
591    /// The downstream fragments of the fragments from the new graph to be created.
592    pub downstream_fragment_relations: FragmentDownstreamRelation,
593
594    /// The scheduled locations of the actors to be built.
595    pub building_locations: Locations,
596
597    /// The actual locations of the external actors.
598    pub existing_locations: Locations,
599
600    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
601    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
602
603    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
604    /// table plan).
605    pub replace_upstream: FragmentReplaceUpstream,
606
607    /// The new no shuffle added to create the new streaming job, including the no shuffle from existing fragments to
608    /// the newly created fragments, between two newly created fragments, and from newly created fragments to existing
609    /// downstream fragments (for create sink into table and replace table).
610    pub new_no_shuffle: FragmentNewNoShuffle,
611}
612
613/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
614/// current cluster info and the required parallelism.
615#[derive(Debug)]
616pub struct ActorGraphBuilder {
617    /// The pre-scheduled distribution for each building fragment.
618    distributions: HashMap<GlobalFragmentId, Distribution>,
619
620    /// The actual distribution for each existing fragment.
621    existing_distributions: HashMap<GlobalFragmentId, Distribution>,
622
623    /// The complete fragment graph.
624    fragment_graph: CompleteStreamFragmentGraph,
625
626    /// The cluster info for creating a streaming job.
627    cluster_info: StreamingClusterInfo,
628}
629
630impl ActorGraphBuilder {
631    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
632    /// graph is failed to be scheduled.
633    pub fn new(
634        streaming_job_id: u32,
635        resource_group: String,
636        fragment_graph: CompleteStreamFragmentGraph,
637        cluster_info: StreamingClusterInfo,
638        default_parallelism: NonZeroUsize,
639    ) -> MetaResult<Self> {
640        let expected_vnode_count = fragment_graph.max_parallelism();
641        let existing_distributions = fragment_graph.existing_distribution();
642
643        let schedulable_workers =
644            cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
645
646        let scheduler = schedule::Scheduler::new(
647            streaming_job_id,
648            &schedulable_workers,
649            default_parallelism,
650            expected_vnode_count,
651        )?;
652
653        let distributions = scheduler.schedule(&fragment_graph)?;
654
655        // Fill the vnode count for each internal table, based on schedule result.
656        let mut fragment_graph = fragment_graph;
657        for (id, fragment) in fragment_graph.building_fragments_mut() {
658            let fragment_vnode_count = distributions[id].vnode_count();
659            visit_tables(fragment, |table, _| {
660                // There are special cases where a hash-distributed fragment contains singleton
661                // internal tables, e.g., the state table of `Source` executors.
662                let vnode_count = if table.is_singleton() {
663                    if fragment_vnode_count > 1 {
664                        tracing::info!(
665                            table.name,
666                            "found singleton table in hash-distributed fragment"
667                        );
668                    }
669                    1
670                } else {
671                    fragment_vnode_count
672                };
673                table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf();
674            })
675        }
676
677        Ok(Self {
678            distributions,
679            existing_distributions,
680            fragment_graph,
681            cluster_info,
682        })
683    }
684
685    /// Get the distribution of the given fragment. Will look up the distribution map of both the
686    /// building and existing fragments.
687    fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
688        self.distributions
689            .get(&fragment_id)
690            .or_else(|| self.existing_distributions.get(&fragment_id))
691            .unwrap()
692    }
693
694    /// Convert the actor location map to the [`Locations`] struct.
695    fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
696        let actor_locations = actor_locations
697            .into_iter()
698            .map(|(id, worker_slot_id)| (id.as_global_id(), worker_slot_id))
699            .collect();
700
701        let worker_locations = self
702            .cluster_info
703            .worker_nodes
704            .iter()
705            .map(|(id, node)| (*id as WorkerId, node.clone()))
706            .collect();
707
708        Locations {
709            actor_locations,
710            worker_locations,
711        }
712    }
713
714    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
715    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
716    pub fn generate_graph(
717        self,
718        env: &MetaSrvEnv,
719        job: &StreamingJob,
720        expr_context: ExprContext,
721    ) -> MetaResult<ActorGraphBuildResult> {
722        // Pre-generate IDs for all actors.
723        let actor_len = self
724            .distributions
725            .values()
726            .map(|d| d.parallelism())
727            .sum::<usize>() as u64;
728        let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
729
730        // Build the actor graph and get the final state.
731        let ActorGraphBuildStateInner {
732            fragment_actor_builders,
733            building_locations,
734            downstream_fragment_changes,
735            upstream_fragment_changes,
736            external_locations,
737        } = self.build_actor_graph(id_gen)?;
738
739        for worker_slot_id in external_locations.values() {
740            if self
741                .cluster_info
742                .unschedulable_workers
743                .contains(&worker_slot_id.worker_id())
744            {
745                bail!(
746                    "The worker {} where the associated upstream is located is unschedulable",
747                    worker_slot_id.worker_id(),
748                );
749            }
750        }
751
752        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
753        let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
754        // Serialize the graph into a map of sealed fragments.
755        let graph = {
756            let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
757                HashMap::new();
758
759            // As all fragments are processed, we can now `build` the actors where the `Exchange`
760            // and `Chain` are rewritten.
761            for (fragment_id, builder) in fragment_actor_builders {
762                let global_fragment_id = fragment_id.as_global_id();
763                let node = builder.rewrite()?;
764                for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
765                    if let Some(no_shuffle_upstream) = no_shuffle_upstream {
766                        new_no_shuffle
767                            .entry(upstream_fragment_id.as_global_id())
768                            .or_default()
769                            .try_insert(
770                                global_fragment_id,
771                                no_shuffle_upstream
772                                    .iter()
773                                    .map(|(upstream_actor_id, actor_id)| {
774                                        (upstream_actor_id.as_global_id(), actor_id.as_global_id())
775                                    })
776                                    .collect(),
777                            )
778                            .expect("non-duplicate");
779                    }
780                }
781                downstream_fragment_relations
782                    .try_insert(
783                        global_fragment_id,
784                        builder
785                            .downstreams
786                            .into_iter()
787                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
788                            .collect(),
789                    )
790                    .expect("non-duplicate");
791                fragment_actors
792                    .try_insert(
793                        fragment_id,
794                        (
795                            node,
796                            builder
797                                .actor_builders
798                                .into_values()
799                                .map(|builder| builder.build(job, expr_context.clone()))
800                                .try_collect()?,
801                        ),
802                    )
803                    .expect("non-duplicate");
804            }
805
806            {
807                fragment_actors
808                    .into_iter()
809                    .map(|(fragment_id, (stream_node, actors))| {
810                        let distribution = self.distributions[&fragment_id].clone();
811                        let fragment = self.fragment_graph.seal_fragment(
812                            fragment_id,
813                            actors,
814                            distribution,
815                            stream_node,
816                        );
817                        let fragment_id = fragment_id.as_global_id();
818                        (fragment_id, fragment)
819                    })
820                    .collect()
821            }
822        };
823
824        // Convert the actor location map to the `Locations` struct.
825        let building_locations = self.build_locations(building_locations);
826        let existing_locations = self.build_locations(external_locations);
827
828        // Extract the new fragment relation from the external changes.
829        let upstream_fragment_downstreams = upstream_fragment_changes
830            .into_iter()
831            .map(|(fragment_id, changes)| {
832                (
833                    fragment_id.as_global_id(),
834                    changes
835                        .new_downstreams
836                        .into_iter()
837                        .map(|(downstream_fragment_id, new_dispatch)| {
838                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
839                        })
840                        .collect(),
841                )
842            })
843            .collect();
844
845        // Extract the updates for merge executors from the external changes.
846        let replace_upstream = downstream_fragment_changes
847            .into_iter()
848            .map(|(fragment_id, changes)| {
849                let fragment_id = fragment_id.as_global_id();
850                let new_no_shuffle = &mut new_no_shuffle;
851                (
852                    fragment_id,
853                    changes
854                        .new_upstreams
855                        .into_iter()
856                        .map(
857                            move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
858                                let upstream_fragment_id = upstream_fragment_id.as_global_id();
859                                if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
860                                    && !upstream_new_no_shuffle.is_empty()
861                                {
862                                    let no_shuffle_actors = new_no_shuffle
863                                        .entry(upstream_fragment_id)
864                                        .or_default()
865                                        .entry(fragment_id)
866                                        .or_default();
867                                    no_shuffle_actors.extend(
868                                        upstream_new_no_shuffle.into_iter().map(
869                                            |(upstream_actor_id, actor_id)| {
870                                                (
871                                                    upstream_actor_id.as_global_id(),
872                                                    actor_id.as_global_id(),
873                                                )
874                                            },
875                                        ),
876                                    );
877                                }
878                                let DownstreamExternalEdgeId {
879                                    original_upstream_fragment_id,
880                                    ..
881                                } = edge_id;
882                                (
883                                    original_upstream_fragment_id.as_global_id(),
884                                    upstream_fragment_id,
885                                )
886                            },
887                        )
888                        .collect(),
889                )
890            })
891            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
892            .collect();
893
894        Ok(ActorGraphBuildResult {
895            graph,
896            downstream_fragment_relations,
897            building_locations,
898            existing_locations,
899            upstream_fragment_downstreams,
900            replace_upstream,
901            new_no_shuffle,
902        })
903    }
904
905    /// Build actor graph for each fragment, using topological order.
906    fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
907        let mut state = ActorGraphBuildState::new(id_gen);
908
909        // Use topological sort to build the graph from downstream to upstream. (The first fragment
910        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
911        for fragment_id in self.fragment_graph.topo_order()? {
912            self.build_actor_graph_fragment(fragment_id, &mut state)?;
913        }
914
915        Ok(state.finish())
916    }
917
918    /// Build actor graph for a specific fragment.
919    fn build_actor_graph_fragment(
920        &self,
921        fragment_id: GlobalFragmentId,
922        state: &mut ActorGraphBuildState,
923    ) -> MetaResult<()> {
924        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
925        let distribution = self.get_distribution(fragment_id);
926
927        // First, add or record the actors for the current fragment into the state.
928        let actor_ids = match current_fragment {
929            // For building fragments, we need to generate the actor builders.
930            EitherFragment::Building(current_fragment) => {
931                let node = current_fragment.node.clone().unwrap();
932                state
933                    .inner
934                    .fragment_actor_builders
935                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
936                    .expect("non-duplicate");
937                let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
938
939                distribution
940                    .worker_slots()
941                    .map(|worker_slot| {
942                        let actor_id = state.next_actor_id();
943                        let vnode_bitmap = bitmaps
944                            .as_ref()
945                            .map(|m: &HashMap<WorkerSlotId, Bitmap>| &m[&worker_slot])
946                            .cloned();
947
948                        state
949                            .inner
950                            .add_actor((fragment_id, actor_id), worker_slot, vnode_bitmap);
951
952                        actor_id
953                    })
954                    .collect_vec()
955            }
956
957            // For existing fragments, we only need to record the actor locations.
958            EitherFragment::Existing(existing_fragment) => existing_fragment
959                .actors
960                .iter()
961                .map(|a| {
962                    let actor_id = GlobalActorId::new(a.actor_id);
963                    let worker_slot_id = match &distribution {
964                        Distribution::Singleton(worker_slot_id) => *worker_slot_id,
965                        Distribution::Hash(mapping) => mapping
966                            .get_matched(a.vnode_bitmap.as_ref().unwrap())
967                            .unwrap(),
968                    };
969
970                    state
971                        .inner
972                        .record_external_location(actor_id, worker_slot_id);
973
974                    actor_id
975                })
976                .collect_vec(),
977        };
978
979        // Then, add links between the current fragment and its downstream fragments.
980        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
981            let downstream_actors = state
982                .fragment_actors
983                .get(&downstream_fragment_id)
984                .expect("downstream fragment not processed yet");
985
986            state.inner.add_link(
987                FragmentLinkNode {
988                    fragment_id,
989                    actor_ids: &actor_ids,
990                },
991                FragmentLinkNode {
992                    fragment_id: downstream_fragment_id,
993                    actor_ids: downstream_actors,
994                },
995                edge,
996            );
997        }
998
999        // Finally, record the actor IDs for the current fragment.
1000        state
1001            .fragment_actors
1002            .try_insert(fragment_id, actor_ids)
1003            .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1004
1005        Ok(())
1006    }
1007}