risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::id::JobId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::stream_graph_visitor::visit_tables;
26use risingwave_meta_model::WorkerId;
27use risingwave_pb::plan_common::ExprContext;
28use risingwave_pb::stream_plan::stream_node::NodeBody;
29use risingwave_pb::stream_plan::{
30    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
31};
32
33use super::Locations;
34use crate::MetaResult;
35use crate::controller::cluster::StreamingClusterInfo;
36use crate::manager::{MetaSrvEnv, StreamingJob};
37use crate::model::{
38    Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
39    FragmentReplaceUpstream, StreamActor,
40};
41use crate::stream::stream_graph::fragment::{
42    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
43    StreamFragmentEdge,
44};
45use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
46use crate::stream::stream_graph::schedule;
47use crate::stream::stream_graph::schedule::Distribution;
48
49/// [`ActorBuilder`] builds a stream actor in a stream DAG.
50#[derive(Debug)]
51struct ActorBuilder {
52    /// The ID of this actor.
53    actor_id: GlobalActorId,
54
55    /// The fragment ID of this actor.
56    fragment_id: GlobalFragmentId,
57
58    /// The virtual node bitmap, if this fragment is hash distributed.
59    vnode_bitmap: Option<Bitmap>,
60}
61
62impl ActorBuilder {
63    fn new(
64        actor_id: GlobalActorId,
65        fragment_id: GlobalFragmentId,
66        vnode_bitmap: Option<Bitmap>,
67    ) -> Self {
68        Self {
69            actor_id,
70            fragment_id,
71            vnode_bitmap,
72        }
73    }
74}
75
76impl FragmentActorBuilder {
77    /// Rewrite the actor body.
78    ///
79    /// During this process, the following things will be done:
80    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
81    ///    compute nodes.
82    fn rewrite(&self) -> MetaResult<StreamNode> {
83        self.rewrite_inner(&self.node, 0)
84    }
85
86    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
87        match stream_node.get_node_body()? {
88            // Leaf node `Exchange`.
89            NodeBody::Exchange(exchange) => {
90                // The exchange node should always be the bottom of the plan node. If we find one
91                // when the depth is 0, it means that the plan node is not well-formed.
92                if depth == 0 {
93                    bail!(
94                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
95                        stream_node
96                    )
97                }
98                assert!(!stream_node.get_fields().is_empty());
99                assert!(stream_node.input.is_empty());
100
101                // Index the upstreams by the an internal edge ID.
102                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
103                    link_id: stream_node.get_operator_id(),
104                }];
105
106                let upstream_fragment_id = upstream_fragment_id.as_global_id();
107
108                Ok(StreamNode {
109                    node_body: Some(NodeBody::Merge(Box::new({
110                        MergeNode {
111                            upstream_fragment_id,
112                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
113                            ..Default::default()
114                        }
115                    }))),
116                    identity: "MergeExecutor".to_owned(),
117                    ..stream_node.clone()
118                })
119            }
120
121            // "Leaf" node `StreamScan`.
122            NodeBody::StreamScan(stream_scan) => {
123                let input = stream_node.get_input();
124                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
125                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
126                    // and always reads from log store.
127                    assert!(input.is_empty());
128                    return Ok(stream_node.clone());
129                }
130                assert_eq!(input.len(), 2);
131
132                let merge_node = &input[0];
133                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
134                let batch_plan_node = &input[1];
135                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
136
137                // Index the upstreams by the an external edge ID.
138                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
139                    [&EdgeId::UpstreamExternal {
140                        upstream_table_id: stream_scan.table_id.into(),
141                        downstream_fragment_id: self.fragment_id,
142                    }];
143
144                let is_shuffled_backfill = stream_scan.stream_scan_type
145                    == StreamScanType::ArrangementBackfill as i32
146                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
147                if !is_shuffled_backfill {
148                    assert!(upstream_no_shuffle_actor.is_some());
149                }
150
151                let upstream_dispatcher_type = if is_shuffled_backfill {
152                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
153                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
154                    DispatcherType::Hash as _
155                } else {
156                    DispatcherType::NoShuffle as _
157                };
158
159                let upstream_fragment_id = upstream_fragment_id.as_global_id();
160
161                let input = vec![
162                    // Fill the merge node body with correct upstream info.
163                    StreamNode {
164                        node_body: Some(NodeBody::Merge(Box::new({
165                            MergeNode {
166                                upstream_fragment_id,
167                                upstream_dispatcher_type,
168                                ..Default::default()
169                            }
170                        }))),
171                        ..merge_node.clone()
172                    },
173                    batch_plan_node.clone(),
174                ];
175
176                Ok(StreamNode {
177                    input,
178                    ..stream_node.clone()
179                })
180            }
181
182            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
183            // cdc_filter -> backfill -> mview
184            // source_backfill -> mview
185            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
186                let input = stream_node.get_input();
187                assert_eq!(input.len(), 1);
188
189                let merge_node = &input[0];
190                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
191
192                let upstream_source_id = match stream_node.get_node_body()? {
193                    NodeBody::CdcFilter(node) => node.upstream_source_id,
194                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
195                    _ => unreachable!(),
196                };
197
198                // Index the upstreams by the an external edge ID.
199                let (upstream_fragment_id, upstream_actors) = &self.upstreams
200                    [&EdgeId::UpstreamExternal {
201                        upstream_table_id: upstream_source_id.into(),
202                        downstream_fragment_id: self.fragment_id,
203                    }];
204
205                assert!(
206                    upstream_actors.is_some(),
207                    "Upstream Cdc Source should be singleton. \
208                    SourceBackfill is NoShuffle 1-1 correspondence. \
209                    So they both should have only one upstream actor."
210                );
211
212                let upstream_fragment_id = upstream_fragment_id.as_global_id();
213
214                // rewrite the input
215                let input = vec![
216                    // Fill the merge node body with correct upstream info.
217                    StreamNode {
218                        node_body: Some(NodeBody::Merge(Box::new({
219                            MergeNode {
220                                upstream_fragment_id,
221                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
222                                ..Default::default()
223                            }
224                        }))),
225                        ..merge_node.clone()
226                    },
227                ];
228                Ok(StreamNode {
229                    input,
230                    ..stream_node.clone()
231                })
232            }
233
234            // For other nodes, visit the children recursively.
235            _ => {
236                let mut new_stream_node = stream_node.clone();
237                for (input, new_input) in stream_node
238                    .input
239                    .iter()
240                    .zip_eq_fast(&mut new_stream_node.input)
241                {
242                    *new_input = self.rewrite_inner(input, depth + 1)?;
243                }
244                Ok(new_stream_node)
245            }
246        }
247    }
248}
249
250impl ActorBuilder {
251    /// Build an actor after all the upstreams and downstreams are processed.
252    fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
253        // Only fill the definition when debug assertions enabled, otherwise use name instead.
254        #[cfg(not(debug_assertions))]
255        let mview_definition = job.name();
256        #[cfg(debug_assertions)]
257        let mview_definition = job.definition();
258
259        Ok(StreamActor {
260            actor_id: self.actor_id.as_global_id(),
261            fragment_id: self.fragment_id.as_global_id(),
262            vnode_bitmap: self.vnode_bitmap,
263            mview_definition,
264            expr_context: Some(expr_context),
265        })
266    }
267}
268
269/// The required changes to an existing external actor to build the graph of a streaming job.
270///
271/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
272/// to the upstream actors, by adding new dispatchers.
273#[derive(Default)]
274struct UpstreamFragmentChange {
275    /// The new downstreams to be added.
276    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
277}
278
279#[derive(Default)]
280struct DownstreamFragmentChange {
281    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
282    /// `edge_id` -> new upstream fragment id
283    new_upstreams:
284        HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
285}
286
287impl UpstreamFragmentChange {
288    /// Add a dispatcher to the external actor.
289    fn add_dispatcher(
290        &mut self,
291        downstream_fragment_id: GlobalFragmentId,
292        dispatch: DispatchStrategy,
293    ) {
294        self.new_downstreams
295            .try_insert(downstream_fragment_id, dispatch)
296            .unwrap();
297    }
298}
299
300impl DownstreamFragmentChange {
301    /// Add an upstream to the external actor.
302    fn add_upstream(
303        &mut self,
304        edge_id: DownstreamExternalEdgeId,
305        new_upstream_fragment_id: GlobalFragmentId,
306        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
307    ) {
308        self.new_upstreams
309            .try_insert(
310                edge_id,
311                (new_upstream_fragment_id, no_shuffle_actor_mapping),
312            )
313            .unwrap();
314    }
315}
316
317/// The location of actors.
318type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
319// no_shuffle upstream actor_id -> actor_id
320type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
321
322#[derive(Debug)]
323struct FragmentActorBuilder {
324    fragment_id: GlobalFragmentId,
325    node: StreamNode,
326    actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
327    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
328    // edge_id -> (upstream fragment_id, no shuffle actor pairs if it's no shuffle dispatched)
329    upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
330}
331
332impl FragmentActorBuilder {
333    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
334        Self {
335            fragment_id,
336            node,
337            actor_builders: Default::default(),
338            downstreams: Default::default(),
339            upstreams: Default::default(),
340        }
341    }
342}
343
344/// The actual mutable state of building an actor graph.
345///
346/// When the fragments are visited in a topological order, actor builders will be added to this
347/// state and the scheduled locations will be added. As the building process is run on the
348/// **complete graph** which also contains the info of the existing (external) fragments, the info
349/// of them will be also recorded.
350#[derive(Default)]
351struct ActorGraphBuildStateInner {
352    /// The builders of the actors to be built.
353    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
354
355    /// The scheduled locations of the actors to be built.
356    building_locations: ActorLocations,
357
358    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
359    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
360    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
361
362    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
363    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
364    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
365
366    /// The actual locations of the external actors.
367    external_locations: ActorLocations,
368}
369
370/// The information of a fragment, used for parameter passing for `Inner::add_link`.
371struct FragmentLinkNode<'a> {
372    fragment_id: GlobalFragmentId,
373    actor_ids: &'a [GlobalActorId],
374}
375
376impl ActorGraphBuildStateInner {
377    /// Insert new generated actor and record its location.
378    ///
379    /// The `vnode_bitmap` should be `Some` for the actors of hash-distributed fragments.
380    fn add_actor(
381        &mut self,
382        (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
383        actor_alignment_id: ActorAlignmentId,
384        vnode_bitmap: Option<Bitmap>,
385    ) {
386        self.fragment_actor_builders
387            .get_mut(&fragment_id)
388            .expect("should added previously")
389            .actor_builders
390            .try_insert(
391                actor_id,
392                ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
393            )
394            .unwrap();
395
396        self.building_locations
397            .try_insert(actor_id, actor_alignment_id)
398            .unwrap();
399    }
400
401    /// Record the location of an external actor.
402    fn record_external_location(
403        &mut self,
404        actor_id: GlobalActorId,
405        actor_alignment_id: ActorAlignmentId,
406    ) {
407        self.external_locations
408            .try_insert(actor_id, actor_alignment_id)
409            .unwrap();
410    }
411
412    /// Add the new downstream fragment relation to a fragment.
413    ///
414    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
415    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
416    fn add_dispatcher(
417        &mut self,
418        fragment_id: GlobalFragmentId,
419        downstream_fragment_id: GlobalFragmentId,
420        dispatch: DispatchStrategy,
421    ) {
422        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
423            builder
424                .downstreams
425                .try_insert(downstream_fragment_id, dispatch)
426                .unwrap();
427        } else {
428            self.upstream_fragment_changes
429                .entry(fragment_id)
430                .or_default()
431                .add_dispatcher(downstream_fragment_id, dispatch);
432        }
433    }
434
435    /// Add the new upstream for an actor.
436    ///
437    /// - If the actor is to be built, the upstream will be added to the actor builder.
438    /// - If the actor is an external actor, the upstream will be added to the external changes.
439    fn add_upstream(
440        &mut self,
441        fragment_id: GlobalFragmentId,
442        edge_id: EdgeId,
443        upstream_fragment_id: GlobalFragmentId,
444        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
445    ) {
446        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
447            builder
448                .upstreams
449                .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
450                .unwrap();
451        } else {
452            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
453                unreachable!("edge from internal to external must be `DownstreamExternal`")
454            };
455            self.downstream_fragment_changes
456                .entry(fragment_id)
457                .or_default()
458                .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
459        }
460    }
461
462    /// Get the location of an actor. Will look up the location map of both the actors to be built
463    /// and the external actors.
464    fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
465        self.building_locations
466            .get(&actor_id)
467            .copied()
468            .or_else(|| self.external_locations.get(&actor_id).copied())
469            .unwrap()
470    }
471
472    /// Add a "link" between two fragments in the graph.
473    ///
474    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
475    /// based on the distribution and the dispatch strategy. They will be
476    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
477    ///
478    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
479    /// instead of the actor builders.
480    fn add_link<'a>(
481        &mut self,
482        upstream: FragmentLinkNode<'a>,
483        downstream: FragmentLinkNode<'a>,
484        edge: &'a StreamFragmentEdge,
485    ) {
486        let dt = edge.dispatch_strategy.r#type();
487
488        match dt {
489            // For `NoShuffle`, make n "1-1" links between the actors.
490            DispatcherType::NoShuffle => {
491                assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
492                let upstream_locations: HashMap<_, _> = upstream
493                    .actor_ids
494                    .iter()
495                    .map(|id| (self.get_location(*id), *id))
496                    .collect();
497                let downstream_locations: HashMap<_, _> = downstream
498                    .actor_ids
499                    .iter()
500                    .map(|id| (self.get_location(*id), *id))
501                    .collect();
502
503                // Create a new dispatcher just between these two actors.
504                self.add_dispatcher(
505                    upstream.fragment_id,
506                    downstream.fragment_id,
507                    edge.dispatch_strategy.clone(),
508                );
509
510                // Also record the upstream for the downstream actor.
511                self.add_upstream(
512                    downstream.fragment_id,
513                    edge.id,
514                    upstream.fragment_id,
515                    Some(
516                        downstream_locations
517                            .iter()
518                            .map(|(location, downstream_actor_id)| {
519                                let upstream_actor_id = upstream_locations.get(location).unwrap();
520                                (*upstream_actor_id, *downstream_actor_id)
521                            })
522                            .collect(),
523                    ),
524                );
525            }
526
527            // Otherwise, make m * n links between the actors.
528            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
529                self.add_dispatcher(
530                    upstream.fragment_id,
531                    downstream.fragment_id,
532                    edge.dispatch_strategy.clone(),
533                );
534                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
535            }
536
537            DispatcherType::Unspecified => unreachable!(),
538        }
539    }
540}
541
542/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
543struct ActorGraphBuildState {
544    /// The actual state.
545    inner: ActorGraphBuildStateInner,
546
547    /// The actor IDs of each fragment.
548    fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
549
550    /// The next local actor id to use.
551    next_local_id: u32,
552
553    /// The global actor id generator.
554    actor_id_gen: GlobalActorIdGen,
555}
556
557impl ActorGraphBuildState {
558    /// Create an empty state with the given id generator.
559    fn new(actor_id_gen: GlobalActorIdGen) -> Self {
560        Self {
561            inner: Default::default(),
562            fragment_actors: Default::default(),
563            next_local_id: 0,
564            actor_id_gen,
565        }
566    }
567
568    /// Get the next global actor id.
569    fn next_actor_id(&mut self) -> GlobalActorId {
570        let local_id = self.next_local_id;
571        self.next_local_id += 1;
572
573        self.actor_id_gen.to_global_id(local_id)
574    }
575
576    /// Finish the build and return the inner state.
577    fn finish(self) -> ActorGraphBuildStateInner {
578        // Assert that all the actors are built.
579        assert_eq!(self.actor_id_gen.len(), self.next_local_id);
580
581        self.inner
582    }
583}
584
585/// The result of a built actor graph. Will be further embedded into the `Context` for building
586/// actors on the compute nodes.
587pub struct ActorGraphBuildResult {
588    /// The graph of sealed fragments, including all actors.
589    pub graph: BTreeMap<FragmentId, Fragment>,
590    /// The downstream fragments of the fragments from the new graph to be created.
591    /// Including the fragment relation to external downstream fragment.
592    pub downstream_fragment_relations: FragmentDownstreamRelation,
593
594    /// The scheduled locations of the actors to be built.
595    pub building_locations: Locations,
596
597    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
598    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
599
600    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
601    /// table plan).
602    pub replace_upstream: FragmentReplaceUpstream,
603
604    /// The new no shuffle added to create the new streaming job, including the no shuffle from existing fragments to
605    /// the newly created fragments, between two newly created fragments, and from newly created fragments to existing
606    /// downstream fragments (for create sink into table and replace table).
607    pub new_no_shuffle: FragmentNewNoShuffle,
608}
609
610/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
611/// current cluster info and the required parallelism.
612#[derive(Debug)]
613pub struct ActorGraphBuilder {
614    /// The pre-scheduled distribution for each building fragment.
615    distributions: HashMap<GlobalFragmentId, Distribution>,
616
617    /// The actual distribution for each existing fragment.
618    existing_distributions: HashMap<GlobalFragmentId, Distribution>,
619
620    /// The complete fragment graph.
621    fragment_graph: CompleteStreamFragmentGraph,
622
623    /// The cluster info for creating a streaming job.
624    cluster_info: StreamingClusterInfo,
625}
626
627impl ActorGraphBuilder {
628    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
629    /// graph is failed to be scheduled.
630    pub fn new(
631        streaming_job_id: JobId,
632        resource_group: String,
633        fragment_graph: CompleteStreamFragmentGraph,
634        cluster_info: StreamingClusterInfo,
635        default_parallelism: NonZeroUsize,
636    ) -> MetaResult<Self> {
637        let expected_vnode_count = fragment_graph.max_parallelism();
638        let existing_distributions = fragment_graph.existing_distribution();
639
640        let schedulable_workers =
641            cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
642
643        let scheduler = schedule::Scheduler::new(
644            streaming_job_id,
645            &schedulable_workers,
646            default_parallelism,
647            expected_vnode_count,
648        )?;
649
650        let distributions = scheduler.schedule(&fragment_graph)?;
651
652        // Fill the vnode count for each internal table, based on schedule result.
653        let mut fragment_graph = fragment_graph;
654        for (id, fragment) in fragment_graph.building_fragments_mut() {
655            let mut error = None;
656            let fragment_vnode_count = distributions[id].vnode_count();
657            visit_tables(fragment, |table, _| {
658                if error.is_some() {
659                    return;
660                }
661                // There are special cases where a hash-distributed fragment contains singleton
662                // internal tables, e.g., the state table of `Source` executors.
663                let vnode_count = if table.is_singleton() {
664                    if fragment_vnode_count > 1 {
665                        tracing::info!(
666                            table.name,
667                            "found singleton table in hash-distributed fragment"
668                        );
669                    }
670                    1
671                } else {
672                    fragment_vnode_count
673                };
674                match table.vnode_count_inner().value_opt() {
675                    // Vnode count of this table is not set to placeholder, meaning that we are replacing
676                    // a streaming job, and the existing state table requires a specific vnode count.
677                    // Check if it's the same with what we derived from the schedule result.
678                    //
679                    // Typically, inconsistency should not happen as we force to align the vnode count
680                    // when planning the new streaming job in the frontend.
681                    Some(required_vnode_count) if required_vnode_count != vnode_count => {
682                        error = Some(format!(
683                            "failed to align vnode count for table {}({}): required {}, but got {}",
684                            table.id, table.name, required_vnode_count, vnode_count
685                        ));
686                    }
687                    // Normal cases.
688                    _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
689                }
690            });
691            if let Some(error) = error {
692                bail!(error);
693            }
694        }
695
696        Ok(Self {
697            distributions,
698            existing_distributions,
699            fragment_graph,
700            cluster_info,
701        })
702    }
703
704    /// Get the distribution of the given fragment. Will look up the distribution map of both the
705    /// building and existing fragments.
706    fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
707        self.distributions
708            .get(&fragment_id)
709            .or_else(|| self.existing_distributions.get(&fragment_id))
710            .unwrap()
711    }
712
713    /// Convert the actor location map to the [`Locations`] struct.
714    fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
715        let actor_locations = actor_locations
716            .into_iter()
717            .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
718            .collect();
719
720        let worker_locations = self
721            .cluster_info
722            .worker_nodes
723            .iter()
724            .map(|(id, node)| (*id as WorkerId, node.clone()))
725            .collect();
726
727        Locations {
728            actor_locations,
729            worker_locations,
730        }
731    }
732
733    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
734    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
735    pub fn generate_graph(
736        self,
737        env: &MetaSrvEnv,
738        job: &StreamingJob,
739        expr_context: ExprContext,
740    ) -> MetaResult<ActorGraphBuildResult> {
741        // Pre-generate IDs for all actors.
742        let actor_len = self
743            .distributions
744            .values()
745            .map(|d| d.parallelism())
746            .sum::<usize>() as u64;
747        let id_gen = GlobalActorIdGen::new(env.actor_id_generator(), actor_len);
748
749        // Build the actor graph and get the final state.
750        let ActorGraphBuildStateInner {
751            fragment_actor_builders,
752            building_locations,
753            downstream_fragment_changes,
754            upstream_fragment_changes,
755            external_locations,
756        } = self.build_actor_graph(id_gen)?;
757
758        for alignment_id in external_locations.values() {
759            if self
760                .cluster_info
761                .unschedulable_workers
762                .contains(&alignment_id.worker_id())
763            {
764                bail!(
765                    "The worker {} where the associated upstream is located is unschedulable",
766                    alignment_id.worker_id(),
767                );
768            }
769        }
770
771        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
772        let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
773        // Serialize the graph into a map of sealed fragments.
774        let graph = {
775            let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
776                HashMap::new();
777
778            // As all fragments are processed, we can now `build` the actors where the `Exchange`
779            // and `Chain` are rewritten.
780            for (fragment_id, builder) in fragment_actor_builders {
781                let global_fragment_id = fragment_id.as_global_id();
782                let node = builder.rewrite()?;
783                for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
784                    if let Some(no_shuffle_upstream) = no_shuffle_upstream {
785                        new_no_shuffle
786                            .entry(upstream_fragment_id.as_global_id())
787                            .or_default()
788                            .try_insert(
789                                global_fragment_id,
790                                no_shuffle_upstream
791                                    .iter()
792                                    .map(|(upstream_actor_id, actor_id)| {
793                                        (upstream_actor_id.as_global_id(), actor_id.as_global_id())
794                                    })
795                                    .collect(),
796                            )
797                            .expect("non-duplicate");
798                    }
799                }
800                downstream_fragment_relations
801                    .try_insert(
802                        global_fragment_id,
803                        builder
804                            .downstreams
805                            .into_iter()
806                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
807                            .collect(),
808                    )
809                    .expect("non-duplicate");
810                fragment_actors
811                    .try_insert(
812                        fragment_id,
813                        (
814                            node,
815                            builder
816                                .actor_builders
817                                .into_values()
818                                .map(|builder| builder.build(job, expr_context.clone()))
819                                .try_collect()?,
820                        ),
821                    )
822                    .expect("non-duplicate");
823            }
824
825            {
826                fragment_actors
827                    .into_iter()
828                    .map(|(fragment_id, (stream_node, actors))| {
829                        let distribution = self.distributions[&fragment_id].clone();
830                        let fragment = self.fragment_graph.seal_fragment(
831                            fragment_id,
832                            actors,
833                            distribution,
834                            stream_node,
835                        );
836                        let fragment_id = fragment_id.as_global_id();
837                        (fragment_id, fragment)
838                    })
839                    .collect()
840            }
841        };
842
843        // Convert the actor location map to the `Locations` struct.
844        let building_locations = self.build_locations(building_locations);
845
846        // Extract the new fragment relation from the external changes.
847        let upstream_fragment_downstreams = upstream_fragment_changes
848            .into_iter()
849            .map(|(fragment_id, changes)| {
850                (
851                    fragment_id.as_global_id(),
852                    changes
853                        .new_downstreams
854                        .into_iter()
855                        .map(|(downstream_fragment_id, new_dispatch)| {
856                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
857                        })
858                        .collect(),
859                )
860            })
861            .collect();
862
863        // Extract the updates for merge executors from the external changes.
864        let replace_upstream = downstream_fragment_changes
865            .into_iter()
866            .map(|(fragment_id, changes)| {
867                let fragment_id = fragment_id.as_global_id();
868                let new_no_shuffle = &mut new_no_shuffle;
869                (
870                    fragment_id,
871                    changes
872                        .new_upstreams
873                        .into_iter()
874                        .map(
875                            move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
876                                let upstream_fragment_id = upstream_fragment_id.as_global_id();
877                                if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
878                                    && !upstream_new_no_shuffle.is_empty()
879                                {
880                                    let no_shuffle_actors = new_no_shuffle
881                                        .entry(upstream_fragment_id)
882                                        .or_default()
883                                        .entry(fragment_id)
884                                        .or_default();
885                                    no_shuffle_actors.extend(
886                                        upstream_new_no_shuffle.into_iter().map(
887                                            |(upstream_actor_id, actor_id)| {
888                                                (
889                                                    upstream_actor_id.as_global_id(),
890                                                    actor_id.as_global_id(),
891                                                )
892                                            },
893                                        ),
894                                    );
895                                }
896                                let DownstreamExternalEdgeId {
897                                    original_upstream_fragment_id,
898                                    ..
899                                } = edge_id;
900                                (
901                                    original_upstream_fragment_id.as_global_id(),
902                                    upstream_fragment_id,
903                                )
904                            },
905                        )
906                        .collect(),
907                )
908            })
909            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
910            .collect();
911
912        Ok(ActorGraphBuildResult {
913            graph,
914            downstream_fragment_relations,
915            building_locations,
916            upstream_fragment_downstreams,
917            replace_upstream,
918            new_no_shuffle,
919        })
920    }
921
922    /// Build actor graph for each fragment, using topological order.
923    fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
924        let mut state = ActorGraphBuildState::new(id_gen);
925
926        // Use topological sort to build the graph from downstream to upstream. (The first fragment
927        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
928        for fragment_id in self.fragment_graph.topo_order()? {
929            self.build_actor_graph_fragment(fragment_id, &mut state)?;
930        }
931
932        Ok(state.finish())
933    }
934
935    /// Build actor graph for a specific fragment.
936    fn build_actor_graph_fragment(
937        &self,
938        fragment_id: GlobalFragmentId,
939        state: &mut ActorGraphBuildState,
940    ) -> MetaResult<()> {
941        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
942        let distribution = self.get_distribution(fragment_id);
943
944        // First, add or record the actors for the current fragment into the state.
945        let actor_ids = match current_fragment {
946            // For building fragments, we need to generate the actor builders.
947            EitherFragment::Building(current_fragment) => {
948                let node = current_fragment.node.clone().unwrap();
949                state
950                    .inner
951                    .fragment_actor_builders
952                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
953                    .expect("non-duplicate");
954                let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
955
956                distribution
957                    .actors()
958                    .map(|alignment_id| {
959                        let actor_id = state.next_actor_id();
960                        let vnode_bitmap = bitmaps
961                            .as_ref()
962                            .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
963                            .cloned();
964
965                        state
966                            .inner
967                            .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
968
969                        actor_id
970                    })
971                    .collect_vec()
972            }
973
974            // For existing fragments, we only need to record the actor locations.
975            EitherFragment::Existing(existing_fragment) => existing_fragment
976                .actors
977                .iter()
978                .map(|(actor_id, actor_info)| {
979                    let actor_id = GlobalActorId::new(*actor_id);
980                    let alignment_id = match &distribution {
981                        Distribution::Singleton(worker_id) => {
982                            ActorAlignmentId::new_single(*worker_id as u32)
983                        }
984                        Distribution::Hash(mapping) => mapping
985                            .get_matched(actor_info.vnode_bitmap.as_ref().unwrap())
986                            .unwrap(),
987                    };
988
989                    state.inner.record_external_location(actor_id, alignment_id);
990
991                    actor_id
992                })
993                .collect_vec(),
994        };
995
996        // Then, add links between the current fragment and its downstream fragments.
997        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
998            let downstream_actors = state
999                .fragment_actors
1000                .get(&downstream_fragment_id)
1001                .expect("downstream fragment not processed yet");
1002
1003            state.inner.add_link(
1004                FragmentLinkNode {
1005                    fragment_id,
1006                    actor_ids: &actor_ids,
1007                },
1008                FragmentLinkNode {
1009                    fragment_id: downstream_fragment_id,
1010                    actor_ids: downstream_actors,
1011                },
1012                edge,
1013            );
1014        }
1015
1016        // Finally, record the actor IDs for the current fragment.
1017        state
1018            .fragment_actors
1019            .try_insert(fragment_id, actor_ids)
1020            .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1021
1022        Ok(())
1023    }
1024}