risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37    Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38    FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42    StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48/// [`ActorBuilder`] builds a stream actor in a stream DAG.
49#[derive(Debug)]
50struct ActorBuilder {
51    /// The ID of this actor.
52    actor_id: GlobalActorId,
53
54    /// The fragment ID of this actor.
55    fragment_id: GlobalFragmentId,
56
57    /// The virtual node bitmap, if this fragment is hash distributed.
58    vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62    fn new(
63        actor_id: GlobalActorId,
64        fragment_id: GlobalFragmentId,
65        vnode_bitmap: Option<Bitmap>,
66    ) -> Self {
67        Self {
68            actor_id,
69            fragment_id,
70            vnode_bitmap,
71        }
72    }
73}
74
75impl FragmentActorBuilder {
76    /// Rewrite the actor body.
77    ///
78    /// During this process, the following things will be done:
79    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
80    ///    compute nodes.
81    fn rewrite(&self) -> MetaResult<StreamNode> {
82        self.rewrite_inner(&self.node, 0)
83    }
84
85    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86        match stream_node.get_node_body()? {
87            // Leaf node `Exchange`.
88            NodeBody::Exchange(exchange) => {
89                // The exchange node should always be the bottom of the plan node. If we find one
90                // when the depth is 0, it means that the plan node is not well-formed.
91                if depth == 0 {
92                    bail!(
93                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
94                        stream_node
95                    )
96                }
97                assert!(!stream_node.get_fields().is_empty());
98                assert!(stream_node.input.is_empty());
99
100                // Index the upstreams by the an internal edge ID.
101                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102                    link_id: stream_node.get_operator_id(),
103                }];
104
105                let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107                Ok(StreamNode {
108                    node_body: Some(NodeBody::Merge(Box::new({
109                        #[expect(deprecated)]
110                        MergeNode {
111                            upstream_actor_id: vec![],
112                            upstream_fragment_id,
113                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
114                            fields: stream_node.get_fields().clone(),
115                        }
116                    }))),
117                    identity: "MergeExecutor".to_owned(),
118                    ..stream_node.clone()
119                })
120            }
121
122            // "Leaf" node `StreamScan`.
123            NodeBody::StreamScan(stream_scan) => {
124                let input = stream_node.get_input();
125                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
126                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
127                    // and always reads from log store.
128                    assert!(input.is_empty());
129                    return Ok(stream_node.clone());
130                }
131                assert_eq!(input.len(), 2);
132
133                let merge_node = &input[0];
134                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
135                let batch_plan_node = &input[1];
136                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
137
138                // Index the upstreams by the an external edge ID.
139                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
140                    [&EdgeId::UpstreamExternal {
141                        upstream_table_id: stream_scan.table_id.into(),
142                        downstream_fragment_id: self.fragment_id,
143                    }];
144
145                let is_shuffled_backfill = stream_scan.stream_scan_type
146                    == StreamScanType::ArrangementBackfill as i32
147                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
148                if !is_shuffled_backfill {
149                    assert!(upstream_no_shuffle_actor.is_some());
150                }
151
152                let upstream_dispatcher_type = if is_shuffled_backfill {
153                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
154                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
155                    DispatcherType::Hash as _
156                } else {
157                    DispatcherType::NoShuffle as _
158                };
159
160                let upstream_fragment_id = upstream_fragment_id.as_global_id();
161
162                let input = vec![
163                    // Fill the merge node body with correct upstream info.
164                    StreamNode {
165                        node_body: Some(NodeBody::Merge(Box::new({
166                            #[expect(deprecated)]
167                            MergeNode {
168                                upstream_actor_id: vec![],
169                                upstream_fragment_id,
170                                upstream_dispatcher_type,
171                                fields: merge_node.fields.clone(),
172                            }
173                        }))),
174                        ..merge_node.clone()
175                    },
176                    batch_plan_node.clone(),
177                ];
178
179                Ok(StreamNode {
180                    input,
181                    ..stream_node.clone()
182                })
183            }
184
185            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
186            // cdc_filter -> backfill -> mview
187            // source_backfill -> mview
188            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
189                let input = stream_node.get_input();
190                assert_eq!(input.len(), 1);
191
192                let merge_node = &input[0];
193                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
194
195                let upstream_source_id = match stream_node.get_node_body()? {
196                    NodeBody::CdcFilter(node) => node.upstream_source_id,
197                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
198                    _ => unreachable!(),
199                };
200
201                // Index the upstreams by the an external edge ID.
202                let (upstream_fragment_id, upstream_actors) = &self.upstreams
203                    [&EdgeId::UpstreamExternal {
204                        upstream_table_id: upstream_source_id.into(),
205                        downstream_fragment_id: self.fragment_id,
206                    }];
207
208                assert!(
209                    upstream_actors.is_some(),
210                    "Upstream Cdc Source should be singleton. \
211                    SourceBackfill is NoShuffle 1-1 correspondence. \
212                    So they both should have only one upstream actor."
213                );
214
215                let upstream_fragment_id = upstream_fragment_id.as_global_id();
216
217                // rewrite the input
218                let input = vec![
219                    // Fill the merge node body with correct upstream info.
220                    StreamNode {
221                        node_body: Some(NodeBody::Merge(Box::new({
222                            #[expect(deprecated)]
223                            MergeNode {
224                                upstream_actor_id: vec![],
225                                upstream_fragment_id,
226                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
227                                fields: merge_node.fields.clone(),
228                            }
229                        }))),
230                        ..merge_node.clone()
231                    },
232                ];
233                Ok(StreamNode {
234                    input,
235                    ..stream_node.clone()
236                })
237            }
238
239            // For other nodes, visit the children recursively.
240            _ => {
241                let mut new_stream_node = stream_node.clone();
242                for (input, new_input) in stream_node
243                    .input
244                    .iter()
245                    .zip_eq_fast(&mut new_stream_node.input)
246                {
247                    *new_input = self.rewrite_inner(input, depth + 1)?;
248                }
249                Ok(new_stream_node)
250            }
251        }
252    }
253}
254
255impl ActorBuilder {
256    /// Build an actor after all the upstreams and downstreams are processed.
257    fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
258        // Only fill the definition when debug assertions enabled, otherwise use name instead.
259        #[cfg(not(debug_assertions))]
260        let mview_definition = job.name();
261        #[cfg(debug_assertions)]
262        let mview_definition = job.definition();
263
264        Ok(StreamActor {
265            actor_id: self.actor_id.as_global_id(),
266            fragment_id: self.fragment_id.as_global_id(),
267            vnode_bitmap: self.vnode_bitmap,
268            mview_definition,
269            expr_context: Some(expr_context),
270        })
271    }
272}
273
274/// The required changes to an existing external actor to build the graph of a streaming job.
275///
276/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
277/// to the upstream actors, by adding new dispatchers.
278#[derive(Default)]
279struct UpstreamFragmentChange {
280    /// The new downstreams to be added.
281    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
282}
283
284#[derive(Default)]
285struct DownstreamFragmentChange {
286    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
287    /// `edge_id` -> new upstream fragment id
288    new_upstreams:
289        HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
290}
291
292impl UpstreamFragmentChange {
293    /// Add a dispatcher to the external actor.
294    fn add_dispatcher(
295        &mut self,
296        downstream_fragment_id: GlobalFragmentId,
297        dispatch: DispatchStrategy,
298    ) {
299        self.new_downstreams
300            .try_insert(downstream_fragment_id, dispatch)
301            .unwrap();
302    }
303}
304
305impl DownstreamFragmentChange {
306    /// Add an upstream to the external actor.
307    fn add_upstream(
308        &mut self,
309        edge_id: DownstreamExternalEdgeId,
310        new_upstream_fragment_id: GlobalFragmentId,
311        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
312    ) {
313        self.new_upstreams
314            .try_insert(
315                edge_id,
316                (new_upstream_fragment_id, no_shuffle_actor_mapping),
317            )
318            .unwrap();
319    }
320}
321
322/// The location of actors.
323type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
324// no_shuffle upstream actor_id -> actor_id
325type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
326
327#[derive(Debug)]
328struct FragmentActorBuilder {
329    fragment_id: GlobalFragmentId,
330    node: StreamNode,
331    actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
332    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
333    // edge_id -> (upstream fragment_id, no shuffle actor pairs if it's no shuffle dispatched)
334    upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
335}
336
337impl FragmentActorBuilder {
338    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
339        Self {
340            fragment_id,
341            node,
342            actor_builders: Default::default(),
343            downstreams: Default::default(),
344            upstreams: Default::default(),
345        }
346    }
347}
348
349/// The actual mutable state of building an actor graph.
350///
351/// When the fragments are visited in a topological order, actor builders will be added to this
352/// state and the scheduled locations will be added. As the building process is run on the
353/// **complete graph** which also contains the info of the existing (external) fragments, the info
354/// of them will be also recorded.
355#[derive(Default)]
356struct ActorGraphBuildStateInner {
357    /// The builders of the actors to be built.
358    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
359
360    /// The scheduled locations of the actors to be built.
361    building_locations: ActorLocations,
362
363    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
364    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
365    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
366
367    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
368    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
369    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
370
371    /// The actual locations of the external actors.
372    external_locations: ActorLocations,
373}
374
375/// The information of a fragment, used for parameter passing for `Inner::add_link`.
376struct FragmentLinkNode<'a> {
377    fragment_id: GlobalFragmentId,
378    actor_ids: &'a [GlobalActorId],
379}
380
381impl ActorGraphBuildStateInner {
382    /// Insert new generated actor and record its location.
383    ///
384    /// The `vnode_bitmap` should be `Some` for the actors of hash-distributed fragments.
385    fn add_actor(
386        &mut self,
387        (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
388        actor_alignment_id: ActorAlignmentId,
389        vnode_bitmap: Option<Bitmap>,
390    ) {
391        self.fragment_actor_builders
392            .get_mut(&fragment_id)
393            .expect("should added previously")
394            .actor_builders
395            .try_insert(
396                actor_id,
397                ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
398            )
399            .unwrap();
400
401        self.building_locations
402            .try_insert(actor_id, actor_alignment_id)
403            .unwrap();
404    }
405
406    /// Record the location of an external actor.
407    fn record_external_location(
408        &mut self,
409        actor_id: GlobalActorId,
410        actor_alignment_id: ActorAlignmentId,
411    ) {
412        self.external_locations
413            .try_insert(actor_id, actor_alignment_id)
414            .unwrap();
415    }
416
417    /// Add the new downstream fragment relation to a fragment.
418    ///
419    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
420    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
421    fn add_dispatcher(
422        &mut self,
423        fragment_id: GlobalFragmentId,
424        downstream_fragment_id: GlobalFragmentId,
425        dispatch: DispatchStrategy,
426    ) {
427        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
428            builder
429                .downstreams
430                .try_insert(downstream_fragment_id, dispatch)
431                .unwrap();
432        } else {
433            self.upstream_fragment_changes
434                .entry(fragment_id)
435                .or_default()
436                .add_dispatcher(downstream_fragment_id, dispatch);
437        }
438    }
439
440    /// Add the new upstream for an actor.
441    ///
442    /// - If the actor is to be built, the upstream will be added to the actor builder.
443    /// - If the actor is an external actor, the upstream will be added to the external changes.
444    fn add_upstream(
445        &mut self,
446        fragment_id: GlobalFragmentId,
447        edge_id: EdgeId,
448        upstream_fragment_id: GlobalFragmentId,
449        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
450    ) {
451        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
452            builder
453                .upstreams
454                .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
455                .unwrap();
456        } else {
457            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
458                unreachable!("edge from internal to external must be `DownstreamExternal`")
459            };
460            self.downstream_fragment_changes
461                .entry(fragment_id)
462                .or_default()
463                .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
464        }
465    }
466
467    /// Get the location of an actor. Will look up the location map of both the actors to be built
468    /// and the external actors.
469    fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
470        self.building_locations
471            .get(&actor_id)
472            .copied()
473            .or_else(|| self.external_locations.get(&actor_id).copied())
474            .unwrap()
475    }
476
477    /// Add a "link" between two fragments in the graph.
478    ///
479    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
480    /// based on the distribution and the dispatch strategy. They will be
481    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
482    ///
483    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
484    /// instead of the actor builders.
485    fn add_link<'a>(
486        &mut self,
487        upstream: FragmentLinkNode<'a>,
488        downstream: FragmentLinkNode<'a>,
489        edge: &'a StreamFragmentEdge,
490    ) {
491        let dt = edge.dispatch_strategy.r#type();
492
493        match dt {
494            // For `NoShuffle`, make n "1-1" links between the actors.
495            DispatcherType::NoShuffle => {
496                assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
497                let upstream_locations: HashMap<_, _> = upstream
498                    .actor_ids
499                    .iter()
500                    .map(|id| (self.get_location(*id), *id))
501                    .collect();
502                let downstream_locations: HashMap<_, _> = downstream
503                    .actor_ids
504                    .iter()
505                    .map(|id| (self.get_location(*id), *id))
506                    .collect();
507
508                // Create a new dispatcher just between these two actors.
509                self.add_dispatcher(
510                    upstream.fragment_id,
511                    downstream.fragment_id,
512                    edge.dispatch_strategy.clone(),
513                );
514
515                // Also record the upstream for the downstream actor.
516                self.add_upstream(
517                    downstream.fragment_id,
518                    edge.id,
519                    upstream.fragment_id,
520                    Some(
521                        downstream_locations
522                            .iter()
523                            .map(|(location, downstream_actor_id)| {
524                                let upstream_actor_id = upstream_locations.get(location).unwrap();
525                                (*upstream_actor_id, *downstream_actor_id)
526                            })
527                            .collect(),
528                    ),
529                );
530            }
531
532            // Otherwise, make m * n links between the actors.
533            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
534                self.add_dispatcher(
535                    upstream.fragment_id,
536                    downstream.fragment_id,
537                    edge.dispatch_strategy.clone(),
538                );
539                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
540            }
541
542            DispatcherType::Unspecified => unreachable!(),
543        }
544    }
545}
546
547/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
548struct ActorGraphBuildState {
549    /// The actual state.
550    inner: ActorGraphBuildStateInner,
551
552    /// The actor IDs of each fragment.
553    fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
554
555    /// The next local actor id to use.
556    next_local_id: u32,
557
558    /// The global actor id generator.
559    actor_id_gen: GlobalActorIdGen,
560}
561
562impl ActorGraphBuildState {
563    /// Create an empty state with the given id generator.
564    fn new(actor_id_gen: GlobalActorIdGen) -> Self {
565        Self {
566            inner: Default::default(),
567            fragment_actors: Default::default(),
568            next_local_id: 0,
569            actor_id_gen,
570        }
571    }
572
573    /// Get the next global actor id.
574    fn next_actor_id(&mut self) -> GlobalActorId {
575        let local_id = self.next_local_id;
576        self.next_local_id += 1;
577
578        self.actor_id_gen.to_global_id(local_id)
579    }
580
581    /// Finish the build and return the inner state.
582    fn finish(self) -> ActorGraphBuildStateInner {
583        // Assert that all the actors are built.
584        assert_eq!(self.actor_id_gen.len(), self.next_local_id);
585
586        self.inner
587    }
588}
589
590/// The result of a built actor graph. Will be further embedded into the `Context` for building
591/// actors on the compute nodes.
592pub struct ActorGraphBuildResult {
593    /// The graph of sealed fragments, including all actors.
594    pub graph: BTreeMap<FragmentId, Fragment>,
595    /// The downstream fragments of the fragments from the new graph to be created.
596    pub downstream_fragment_relations: FragmentDownstreamRelation,
597
598    /// The scheduled locations of the actors to be built.
599    pub building_locations: Locations,
600
601    /// The actual locations of the external actors.
602    pub existing_locations: Locations,
603
604    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
605    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
606
607    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
608    /// table plan).
609    pub replace_upstream: FragmentReplaceUpstream,
610
611    /// The new no shuffle added to create the new streaming job, including the no shuffle from existing fragments to
612    /// the newly created fragments, between two newly created fragments, and from newly created fragments to existing
613    /// downstream fragments (for create sink into table and replace table).
614    pub new_no_shuffle: FragmentNewNoShuffle,
615}
616
617/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
618/// current cluster info and the required parallelism.
619#[derive(Debug)]
620pub struct ActorGraphBuilder {
621    /// The pre-scheduled distribution for each building fragment.
622    distributions: HashMap<GlobalFragmentId, Distribution>,
623
624    /// The actual distribution for each existing fragment.
625    existing_distributions: HashMap<GlobalFragmentId, Distribution>,
626
627    /// The complete fragment graph.
628    fragment_graph: CompleteStreamFragmentGraph,
629
630    /// The cluster info for creating a streaming job.
631    cluster_info: StreamingClusterInfo,
632}
633
634impl ActorGraphBuilder {
635    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
636    /// graph is failed to be scheduled.
637    pub fn new(
638        streaming_job_id: u32,
639        resource_group: String,
640        fragment_graph: CompleteStreamFragmentGraph,
641        cluster_info: StreamingClusterInfo,
642        default_parallelism: NonZeroUsize,
643    ) -> MetaResult<Self> {
644        let expected_vnode_count = fragment_graph.max_parallelism();
645        let existing_distributions = fragment_graph.existing_distribution();
646
647        let schedulable_workers =
648            cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
649
650        let scheduler = schedule::Scheduler::new(
651            streaming_job_id,
652            &schedulable_workers,
653            default_parallelism,
654            expected_vnode_count,
655        )?;
656
657        let distributions = scheduler.schedule(&fragment_graph)?;
658
659        // Fill the vnode count for each internal table, based on schedule result.
660        let mut fragment_graph = fragment_graph;
661        for (id, fragment) in fragment_graph.building_fragments_mut() {
662            let fragment_vnode_count = distributions[id].vnode_count();
663            visit_tables(fragment, |table, _| {
664                // There are special cases where a hash-distributed fragment contains singleton
665                // internal tables, e.g., the state table of `Source` executors.
666                let vnode_count = if table.is_singleton() {
667                    if fragment_vnode_count > 1 {
668                        tracing::info!(
669                            table.name,
670                            "found singleton table in hash-distributed fragment"
671                        );
672                    }
673                    1
674                } else {
675                    fragment_vnode_count
676                };
677                table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf();
678            })
679        }
680
681        Ok(Self {
682            distributions,
683            existing_distributions,
684            fragment_graph,
685            cluster_info,
686        })
687    }
688
689    /// Get the distribution of the given fragment. Will look up the distribution map of both the
690    /// building and existing fragments.
691    fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
692        self.distributions
693            .get(&fragment_id)
694            .or_else(|| self.existing_distributions.get(&fragment_id))
695            .unwrap()
696    }
697
698    /// Convert the actor location map to the [`Locations`] struct.
699    fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
700        let actor_locations = actor_locations
701            .into_iter()
702            .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
703            .collect();
704
705        let worker_locations = self
706            .cluster_info
707            .worker_nodes
708            .iter()
709            .map(|(id, node)| (*id as WorkerId, node.clone()))
710            .collect();
711
712        Locations {
713            actor_locations,
714            worker_locations,
715        }
716    }
717
718    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
719    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
720    pub fn generate_graph(
721        self,
722        env: &MetaSrvEnv,
723        job: &StreamingJob,
724        expr_context: ExprContext,
725    ) -> MetaResult<ActorGraphBuildResult> {
726        // Pre-generate IDs for all actors.
727        let actor_len = self
728            .distributions
729            .values()
730            .map(|d| d.parallelism())
731            .sum::<usize>() as u64;
732        let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
733
734        // Build the actor graph and get the final state.
735        let ActorGraphBuildStateInner {
736            fragment_actor_builders,
737            building_locations,
738            downstream_fragment_changes,
739            upstream_fragment_changes,
740            external_locations,
741        } = self.build_actor_graph(id_gen)?;
742
743        for alignment_id in external_locations.values() {
744            if self
745                .cluster_info
746                .unschedulable_workers
747                .contains(&alignment_id.worker_id())
748            {
749                bail!(
750                    "The worker {} where the associated upstream is located is unschedulable",
751                    alignment_id.worker_id(),
752                );
753            }
754        }
755
756        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
757        let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
758        // Serialize the graph into a map of sealed fragments.
759        let graph = {
760            let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
761                HashMap::new();
762
763            // As all fragments are processed, we can now `build` the actors where the `Exchange`
764            // and `Chain` are rewritten.
765            for (fragment_id, builder) in fragment_actor_builders {
766                let global_fragment_id = fragment_id.as_global_id();
767                let node = builder.rewrite()?;
768                for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
769                    if let Some(no_shuffle_upstream) = no_shuffle_upstream {
770                        new_no_shuffle
771                            .entry(upstream_fragment_id.as_global_id())
772                            .or_default()
773                            .try_insert(
774                                global_fragment_id,
775                                no_shuffle_upstream
776                                    .iter()
777                                    .map(|(upstream_actor_id, actor_id)| {
778                                        (upstream_actor_id.as_global_id(), actor_id.as_global_id())
779                                    })
780                                    .collect(),
781                            )
782                            .expect("non-duplicate");
783                    }
784                }
785                downstream_fragment_relations
786                    .try_insert(
787                        global_fragment_id,
788                        builder
789                            .downstreams
790                            .into_iter()
791                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
792                            .collect(),
793                    )
794                    .expect("non-duplicate");
795                fragment_actors
796                    .try_insert(
797                        fragment_id,
798                        (
799                            node,
800                            builder
801                                .actor_builders
802                                .into_values()
803                                .map(|builder| builder.build(job, expr_context.clone()))
804                                .try_collect()?,
805                        ),
806                    )
807                    .expect("non-duplicate");
808            }
809
810            {
811                fragment_actors
812                    .into_iter()
813                    .map(|(fragment_id, (stream_node, actors))| {
814                        let distribution = self.distributions[&fragment_id].clone();
815                        let fragment = self.fragment_graph.seal_fragment(
816                            fragment_id,
817                            actors,
818                            distribution,
819                            stream_node,
820                        );
821                        let fragment_id = fragment_id.as_global_id();
822                        (fragment_id, fragment)
823                    })
824                    .collect()
825            }
826        };
827
828        // Convert the actor location map to the `Locations` struct.
829        let building_locations = self.build_locations(building_locations);
830        let existing_locations = self.build_locations(external_locations);
831
832        // Extract the new fragment relation from the external changes.
833        let upstream_fragment_downstreams = upstream_fragment_changes
834            .into_iter()
835            .map(|(fragment_id, changes)| {
836                (
837                    fragment_id.as_global_id(),
838                    changes
839                        .new_downstreams
840                        .into_iter()
841                        .map(|(downstream_fragment_id, new_dispatch)| {
842                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
843                        })
844                        .collect(),
845                )
846            })
847            .collect();
848
849        // Extract the updates for merge executors from the external changes.
850        let replace_upstream = downstream_fragment_changes
851            .into_iter()
852            .map(|(fragment_id, changes)| {
853                let fragment_id = fragment_id.as_global_id();
854                let new_no_shuffle = &mut new_no_shuffle;
855                (
856                    fragment_id,
857                    changes
858                        .new_upstreams
859                        .into_iter()
860                        .map(
861                            move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
862                                let upstream_fragment_id = upstream_fragment_id.as_global_id();
863                                if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
864                                    && !upstream_new_no_shuffle.is_empty()
865                                {
866                                    let no_shuffle_actors = new_no_shuffle
867                                        .entry(upstream_fragment_id)
868                                        .or_default()
869                                        .entry(fragment_id)
870                                        .or_default();
871                                    no_shuffle_actors.extend(
872                                        upstream_new_no_shuffle.into_iter().map(
873                                            |(upstream_actor_id, actor_id)| {
874                                                (
875                                                    upstream_actor_id.as_global_id(),
876                                                    actor_id.as_global_id(),
877                                                )
878                                            },
879                                        ),
880                                    );
881                                }
882                                let DownstreamExternalEdgeId {
883                                    original_upstream_fragment_id,
884                                    ..
885                                } = edge_id;
886                                (
887                                    original_upstream_fragment_id.as_global_id(),
888                                    upstream_fragment_id,
889                                )
890                            },
891                        )
892                        .collect(),
893                )
894            })
895            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
896            .collect();
897
898        Ok(ActorGraphBuildResult {
899            graph,
900            downstream_fragment_relations,
901            building_locations,
902            existing_locations,
903            upstream_fragment_downstreams,
904            replace_upstream,
905            new_no_shuffle,
906        })
907    }
908
909    /// Build actor graph for each fragment, using topological order.
910    fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
911        let mut state = ActorGraphBuildState::new(id_gen);
912
913        // Use topological sort to build the graph from downstream to upstream. (The first fragment
914        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
915        for fragment_id in self.fragment_graph.topo_order()? {
916            self.build_actor_graph_fragment(fragment_id, &mut state)?;
917        }
918
919        Ok(state.finish())
920    }
921
922    /// Build actor graph for a specific fragment.
923    fn build_actor_graph_fragment(
924        &self,
925        fragment_id: GlobalFragmentId,
926        state: &mut ActorGraphBuildState,
927    ) -> MetaResult<()> {
928        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
929        let distribution = self.get_distribution(fragment_id);
930
931        // First, add or record the actors for the current fragment into the state.
932        let actor_ids = match current_fragment {
933            // For building fragments, we need to generate the actor builders.
934            EitherFragment::Building(current_fragment) => {
935                let node = current_fragment.node.clone().unwrap();
936                state
937                    .inner
938                    .fragment_actor_builders
939                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
940                    .expect("non-duplicate");
941                let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
942
943                distribution
944                    .actors()
945                    .map(|alignment_id| {
946                        let actor_id = state.next_actor_id();
947                        let vnode_bitmap = bitmaps
948                            .as_ref()
949                            .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
950                            .cloned();
951
952                        state
953                            .inner
954                            .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
955
956                        actor_id
957                    })
958                    .collect_vec()
959            }
960
961            // For existing fragments, we only need to record the actor locations.
962            EitherFragment::Existing(existing_fragment) => existing_fragment
963                .actors
964                .iter()
965                .map(|a| {
966                    let actor_id = GlobalActorId::new(a.actor_id);
967                    let alignment_id = match &distribution {
968                        Distribution::Singleton(worker_id) => {
969                            ActorAlignmentId::new_single(*worker_id as u32)
970                        }
971                        Distribution::Hash(mapping) => mapping
972                            .get_matched(a.vnode_bitmap.as_ref().unwrap())
973                            .unwrap(),
974                    };
975
976                    state.inner.record_external_location(actor_id, alignment_id);
977
978                    actor_id
979                })
980                .collect_vec(),
981        };
982
983        // Then, add links between the current fragment and its downstream fragments.
984        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
985            let downstream_actors = state
986                .fragment_actors
987                .get(&downstream_fragment_id)
988                .expect("downstream fragment not processed yet");
989
990            state.inner.add_link(
991                FragmentLinkNode {
992                    fragment_id,
993                    actor_ids: &actor_ids,
994                },
995                FragmentLinkNode {
996                    fragment_id: downstream_fragment_id,
997                    actor_ids: downstream_actors,
998                },
999                edge,
1000            );
1001        }
1002
1003        // Finally, record the actor IDs for the current fragment.
1004        state
1005            .fragment_actors
1006            .try_insert(fragment_id, actor_ids)
1007            .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1008
1009        Ok(())
1010    }
1011}