risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_common::util::stream_graph_visitor::visit_tables;
25use risingwave_meta_model::WorkerId;
26use risingwave_pb::plan_common::ExprContext;
27use risingwave_pb::stream_plan::stream_node::NodeBody;
28use risingwave_pb::stream_plan::{
29    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
30};
31
32use super::Locations;
33use crate::MetaResult;
34use crate::controller::cluster::StreamingClusterInfo;
35use crate::manager::{MetaSrvEnv, StreamingJob};
36use crate::model::{
37    Fragment, FragmentDownstreamRelation, FragmentId, FragmentNewNoShuffle,
38    FragmentReplaceUpstream, StreamActor,
39};
40use crate::stream::stream_graph::fragment::{
41    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
42    StreamFragmentEdge,
43};
44use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
45use crate::stream::stream_graph::schedule;
46use crate::stream::stream_graph::schedule::Distribution;
47
48/// [`ActorBuilder`] builds a stream actor in a stream DAG.
49#[derive(Debug)]
50struct ActorBuilder {
51    /// The ID of this actor.
52    actor_id: GlobalActorId,
53
54    /// The fragment ID of this actor.
55    fragment_id: GlobalFragmentId,
56
57    /// The virtual node bitmap, if this fragment is hash distributed.
58    vnode_bitmap: Option<Bitmap>,
59}
60
61impl ActorBuilder {
62    fn new(
63        actor_id: GlobalActorId,
64        fragment_id: GlobalFragmentId,
65        vnode_bitmap: Option<Bitmap>,
66    ) -> Self {
67        Self {
68            actor_id,
69            fragment_id,
70            vnode_bitmap,
71        }
72    }
73}
74
75impl FragmentActorBuilder {
76    /// Rewrite the actor body.
77    ///
78    /// During this process, the following things will be done:
79    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
80    ///    compute nodes.
81    fn rewrite(&self) -> MetaResult<StreamNode> {
82        self.rewrite_inner(&self.node, 0)
83    }
84
85    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
86        match stream_node.get_node_body()? {
87            // Leaf node `Exchange`.
88            NodeBody::Exchange(exchange) => {
89                // The exchange node should always be the bottom of the plan node. If we find one
90                // when the depth is 0, it means that the plan node is not well-formed.
91                if depth == 0 {
92                    bail!(
93                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
94                        stream_node
95                    )
96                }
97                assert!(!stream_node.get_fields().is_empty());
98                assert!(stream_node.input.is_empty());
99
100                // Index the upstreams by the an internal edge ID.
101                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
102                    link_id: stream_node.get_operator_id(),
103                }];
104
105                let upstream_fragment_id = upstream_fragment_id.as_global_id();
106
107                Ok(StreamNode {
108                    node_body: Some(NodeBody::Merge(Box::new({
109                        MergeNode {
110                            upstream_fragment_id,
111                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
112                            ..Default::default()
113                        }
114                    }))),
115                    identity: "MergeExecutor".to_owned(),
116                    ..stream_node.clone()
117                })
118            }
119
120            // "Leaf" node `StreamScan`.
121            NodeBody::StreamScan(stream_scan) => {
122                let input = stream_node.get_input();
123                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
124                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
125                    // and always reads from log store.
126                    assert!(input.is_empty());
127                    return Ok(stream_node.clone());
128                }
129                assert_eq!(input.len(), 2);
130
131                let merge_node = &input[0];
132                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
133                let batch_plan_node = &input[1];
134                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
135
136                // Index the upstreams by the an external edge ID.
137                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
138                    [&EdgeId::UpstreamExternal {
139                        upstream_table_id: stream_scan.table_id.into(),
140                        downstream_fragment_id: self.fragment_id,
141                    }];
142
143                let is_shuffled_backfill = stream_scan.stream_scan_type
144                    == StreamScanType::ArrangementBackfill as i32
145                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
146                if !is_shuffled_backfill {
147                    assert!(upstream_no_shuffle_actor.is_some());
148                }
149
150                let upstream_dispatcher_type = if is_shuffled_backfill {
151                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
152                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
153                    DispatcherType::Hash as _
154                } else {
155                    DispatcherType::NoShuffle as _
156                };
157
158                let upstream_fragment_id = upstream_fragment_id.as_global_id();
159
160                let input = vec![
161                    // Fill the merge node body with correct upstream info.
162                    StreamNode {
163                        node_body: Some(NodeBody::Merge(Box::new({
164                            MergeNode {
165                                upstream_fragment_id,
166                                upstream_dispatcher_type,
167                                ..Default::default()
168                            }
169                        }))),
170                        ..merge_node.clone()
171                    },
172                    batch_plan_node.clone(),
173                ];
174
175                Ok(StreamNode {
176                    input,
177                    ..stream_node.clone()
178                })
179            }
180
181            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
182            // cdc_filter -> backfill -> mview
183            // source_backfill -> mview
184            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
185                let input = stream_node.get_input();
186                assert_eq!(input.len(), 1);
187
188                let merge_node = &input[0];
189                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
190
191                let upstream_source_id = match stream_node.get_node_body()? {
192                    NodeBody::CdcFilter(node) => node.upstream_source_id,
193                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
194                    _ => unreachable!(),
195                };
196
197                // Index the upstreams by the an external edge ID.
198                let (upstream_fragment_id, upstream_actors) = &self.upstreams
199                    [&EdgeId::UpstreamExternal {
200                        upstream_table_id: upstream_source_id.into(),
201                        downstream_fragment_id: self.fragment_id,
202                    }];
203
204                assert!(
205                    upstream_actors.is_some(),
206                    "Upstream Cdc Source should be singleton. \
207                    SourceBackfill is NoShuffle 1-1 correspondence. \
208                    So they both should have only one upstream actor."
209                );
210
211                let upstream_fragment_id = upstream_fragment_id.as_global_id();
212
213                // rewrite the input
214                let input = vec![
215                    // Fill the merge node body with correct upstream info.
216                    StreamNode {
217                        node_body: Some(NodeBody::Merge(Box::new({
218                            MergeNode {
219                                upstream_fragment_id,
220                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
221                                ..Default::default()
222                            }
223                        }))),
224                        ..merge_node.clone()
225                    },
226                ];
227                Ok(StreamNode {
228                    input,
229                    ..stream_node.clone()
230                })
231            }
232
233            // For other nodes, visit the children recursively.
234            _ => {
235                let mut new_stream_node = stream_node.clone();
236                for (input, new_input) in stream_node
237                    .input
238                    .iter()
239                    .zip_eq_fast(&mut new_stream_node.input)
240                {
241                    *new_input = self.rewrite_inner(input, depth + 1)?;
242                }
243                Ok(new_stream_node)
244            }
245        }
246    }
247}
248
249impl ActorBuilder {
250    /// Build an actor after all the upstreams and downstreams are processed.
251    fn build(self, job: &StreamingJob, expr_context: ExprContext) -> MetaResult<StreamActor> {
252        // Only fill the definition when debug assertions enabled, otherwise use name instead.
253        #[cfg(not(debug_assertions))]
254        let mview_definition = job.name();
255        #[cfg(debug_assertions)]
256        let mview_definition = job.definition();
257
258        Ok(StreamActor {
259            actor_id: self.actor_id.as_global_id(),
260            fragment_id: self.fragment_id.as_global_id(),
261            vnode_bitmap: self.vnode_bitmap,
262            mview_definition,
263            expr_context: Some(expr_context),
264        })
265    }
266}
267
268/// The required changes to an existing external actor to build the graph of a streaming job.
269///
270/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
271/// to the upstream actors, by adding new dispatchers.
272#[derive(Default)]
273struct UpstreamFragmentChange {
274    /// The new downstreams to be added.
275    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
276}
277
278#[derive(Default)]
279struct DownstreamFragmentChange {
280    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
281    /// `edge_id` -> new upstream fragment id
282    new_upstreams:
283        HashMap<DownstreamExternalEdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
284}
285
286impl UpstreamFragmentChange {
287    /// Add a dispatcher to the external actor.
288    fn add_dispatcher(
289        &mut self,
290        downstream_fragment_id: GlobalFragmentId,
291        dispatch: DispatchStrategy,
292    ) {
293        self.new_downstreams
294            .try_insert(downstream_fragment_id, dispatch)
295            .unwrap();
296    }
297}
298
299impl DownstreamFragmentChange {
300    /// Add an upstream to the external actor.
301    fn add_upstream(
302        &mut self,
303        edge_id: DownstreamExternalEdgeId,
304        new_upstream_fragment_id: GlobalFragmentId,
305        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
306    ) {
307        self.new_upstreams
308            .try_insert(
309                edge_id,
310                (new_upstream_fragment_id, no_shuffle_actor_mapping),
311            )
312            .unwrap();
313    }
314}
315
316/// The location of actors.
317type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
318// no_shuffle upstream actor_id -> actor_id
319type NewExternalNoShuffle = HashMap<GlobalActorId, GlobalActorId>;
320
321#[derive(Debug)]
322struct FragmentActorBuilder {
323    fragment_id: GlobalFragmentId,
324    node: StreamNode,
325    actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
326    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
327    // edge_id -> (upstream fragment_id, no shuffle actor pairs if it's no shuffle dispatched)
328    upstreams: HashMap<EdgeId, (GlobalFragmentId, Option<NewExternalNoShuffle>)>,
329}
330
331impl FragmentActorBuilder {
332    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
333        Self {
334            fragment_id,
335            node,
336            actor_builders: Default::default(),
337            downstreams: Default::default(),
338            upstreams: Default::default(),
339        }
340    }
341}
342
343/// The actual mutable state of building an actor graph.
344///
345/// When the fragments are visited in a topological order, actor builders will be added to this
346/// state and the scheduled locations will be added. As the building process is run on the
347/// **complete graph** which also contains the info of the existing (external) fragments, the info
348/// of them will be also recorded.
349#[derive(Default)]
350struct ActorGraphBuildStateInner {
351    /// The builders of the actors to be built.
352    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
353
354    /// The scheduled locations of the actors to be built.
355    building_locations: ActorLocations,
356
357    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
358    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
359    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
360
361    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
362    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
363    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
364
365    /// The actual locations of the external actors.
366    external_locations: ActorLocations,
367}
368
369/// The information of a fragment, used for parameter passing for `Inner::add_link`.
370struct FragmentLinkNode<'a> {
371    fragment_id: GlobalFragmentId,
372    actor_ids: &'a [GlobalActorId],
373}
374
375impl ActorGraphBuildStateInner {
376    /// Insert new generated actor and record its location.
377    ///
378    /// The `vnode_bitmap` should be `Some` for the actors of hash-distributed fragments.
379    fn add_actor(
380        &mut self,
381        (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
382        actor_alignment_id: ActorAlignmentId,
383        vnode_bitmap: Option<Bitmap>,
384    ) {
385        self.fragment_actor_builders
386            .get_mut(&fragment_id)
387            .expect("should added previously")
388            .actor_builders
389            .try_insert(
390                actor_id,
391                ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
392            )
393            .unwrap();
394
395        self.building_locations
396            .try_insert(actor_id, actor_alignment_id)
397            .unwrap();
398    }
399
400    /// Record the location of an external actor.
401    fn record_external_location(
402        &mut self,
403        actor_id: GlobalActorId,
404        actor_alignment_id: ActorAlignmentId,
405    ) {
406        self.external_locations
407            .try_insert(actor_id, actor_alignment_id)
408            .unwrap();
409    }
410
411    /// Add the new downstream fragment relation to a fragment.
412    ///
413    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
414    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
415    fn add_dispatcher(
416        &mut self,
417        fragment_id: GlobalFragmentId,
418        downstream_fragment_id: GlobalFragmentId,
419        dispatch: DispatchStrategy,
420    ) {
421        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
422            builder
423                .downstreams
424                .try_insert(downstream_fragment_id, dispatch)
425                .unwrap();
426        } else {
427            self.upstream_fragment_changes
428                .entry(fragment_id)
429                .or_default()
430                .add_dispatcher(downstream_fragment_id, dispatch);
431        }
432    }
433
434    /// Add the new upstream for an actor.
435    ///
436    /// - If the actor is to be built, the upstream will be added to the actor builder.
437    /// - If the actor is an external actor, the upstream will be added to the external changes.
438    fn add_upstream(
439        &mut self,
440        fragment_id: GlobalFragmentId,
441        edge_id: EdgeId,
442        upstream_fragment_id: GlobalFragmentId,
443        no_shuffle_actor_mapping: Option<HashMap<GlobalActorId, GlobalActorId>>,
444    ) {
445        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
446            builder
447                .upstreams
448                .try_insert(edge_id, (upstream_fragment_id, no_shuffle_actor_mapping))
449                .unwrap();
450        } else {
451            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
452                unreachable!("edge from internal to external must be `DownstreamExternal`")
453            };
454            self.downstream_fragment_changes
455                .entry(fragment_id)
456                .or_default()
457                .add_upstream(edge_id, upstream_fragment_id, no_shuffle_actor_mapping);
458        }
459    }
460
461    /// Get the location of an actor. Will look up the location map of both the actors to be built
462    /// and the external actors.
463    fn get_location(&self, actor_id: GlobalActorId) -> ActorAlignmentId {
464        self.building_locations
465            .get(&actor_id)
466            .copied()
467            .or_else(|| self.external_locations.get(&actor_id).copied())
468            .unwrap()
469    }
470
471    /// Add a "link" between two fragments in the graph.
472    ///
473    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
474    /// based on the distribution and the dispatch strategy. They will be
475    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
476    ///
477    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
478    /// instead of the actor builders.
479    fn add_link<'a>(
480        &mut self,
481        upstream: FragmentLinkNode<'a>,
482        downstream: FragmentLinkNode<'a>,
483        edge: &'a StreamFragmentEdge,
484    ) {
485        let dt = edge.dispatch_strategy.r#type();
486
487        match dt {
488            // For `NoShuffle`, make n "1-1" links between the actors.
489            DispatcherType::NoShuffle => {
490                assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
491                let upstream_locations: HashMap<_, _> = upstream
492                    .actor_ids
493                    .iter()
494                    .map(|id| (self.get_location(*id), *id))
495                    .collect();
496                let downstream_locations: HashMap<_, _> = downstream
497                    .actor_ids
498                    .iter()
499                    .map(|id| (self.get_location(*id), *id))
500                    .collect();
501
502                // Create a new dispatcher just between these two actors.
503                self.add_dispatcher(
504                    upstream.fragment_id,
505                    downstream.fragment_id,
506                    edge.dispatch_strategy.clone(),
507                );
508
509                // Also record the upstream for the downstream actor.
510                self.add_upstream(
511                    downstream.fragment_id,
512                    edge.id,
513                    upstream.fragment_id,
514                    Some(
515                        downstream_locations
516                            .iter()
517                            .map(|(location, downstream_actor_id)| {
518                                let upstream_actor_id = upstream_locations.get(location).unwrap();
519                                (*upstream_actor_id, *downstream_actor_id)
520                            })
521                            .collect(),
522                    ),
523                );
524            }
525
526            // Otherwise, make m * n links between the actors.
527            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
528                self.add_dispatcher(
529                    upstream.fragment_id,
530                    downstream.fragment_id,
531                    edge.dispatch_strategy.clone(),
532                );
533                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, None);
534            }
535
536            DispatcherType::Unspecified => unreachable!(),
537        }
538    }
539}
540
541/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
542struct ActorGraphBuildState {
543    /// The actual state.
544    inner: ActorGraphBuildStateInner,
545
546    /// The actor IDs of each fragment.
547    fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
548
549    /// The next local actor id to use.
550    next_local_id: u32,
551
552    /// The global actor id generator.
553    actor_id_gen: GlobalActorIdGen,
554}
555
556impl ActorGraphBuildState {
557    /// Create an empty state with the given id generator.
558    fn new(actor_id_gen: GlobalActorIdGen) -> Self {
559        Self {
560            inner: Default::default(),
561            fragment_actors: Default::default(),
562            next_local_id: 0,
563            actor_id_gen,
564        }
565    }
566
567    /// Get the next global actor id.
568    fn next_actor_id(&mut self) -> GlobalActorId {
569        let local_id = self.next_local_id;
570        self.next_local_id += 1;
571
572        self.actor_id_gen.to_global_id(local_id)
573    }
574
575    /// Finish the build and return the inner state.
576    fn finish(self) -> ActorGraphBuildStateInner {
577        // Assert that all the actors are built.
578        assert_eq!(self.actor_id_gen.len(), self.next_local_id);
579
580        self.inner
581    }
582}
583
584/// The result of a built actor graph. Will be further embedded into the `Context` for building
585/// actors on the compute nodes.
586pub struct ActorGraphBuildResult {
587    /// The graph of sealed fragments, including all actors.
588    pub graph: BTreeMap<FragmentId, Fragment>,
589    /// The downstream fragments of the fragments from the new graph to be created.
590    /// Including the fragment relation to external downstream fragment.
591    pub downstream_fragment_relations: FragmentDownstreamRelation,
592
593    /// The scheduled locations of the actors to be built.
594    pub building_locations: Locations,
595
596    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
597    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
598
599    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
600    /// table plan).
601    pub replace_upstream: FragmentReplaceUpstream,
602
603    /// The new no shuffle added to create the new streaming job, including the no shuffle from existing fragments to
604    /// the newly created fragments, between two newly created fragments, and from newly created fragments to existing
605    /// downstream fragments (for create sink into table and replace table).
606    pub new_no_shuffle: FragmentNewNoShuffle,
607}
608
609/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
610/// current cluster info and the required parallelism.
611#[derive(Debug)]
612pub struct ActorGraphBuilder {
613    /// The pre-scheduled distribution for each building fragment.
614    distributions: HashMap<GlobalFragmentId, Distribution>,
615
616    /// The actual distribution for each existing fragment.
617    existing_distributions: HashMap<GlobalFragmentId, Distribution>,
618
619    /// The complete fragment graph.
620    fragment_graph: CompleteStreamFragmentGraph,
621
622    /// The cluster info for creating a streaming job.
623    cluster_info: StreamingClusterInfo,
624}
625
626impl ActorGraphBuilder {
627    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
628    /// graph is failed to be scheduled.
629    pub fn new(
630        streaming_job_id: u32,
631        resource_group: String,
632        fragment_graph: CompleteStreamFragmentGraph,
633        cluster_info: StreamingClusterInfo,
634        default_parallelism: NonZeroUsize,
635    ) -> MetaResult<Self> {
636        let expected_vnode_count = fragment_graph.max_parallelism();
637        let existing_distributions = fragment_graph.existing_distribution();
638
639        let schedulable_workers =
640            cluster_info.filter_schedulable_workers_by_resource_group(&resource_group);
641
642        let scheduler = schedule::Scheduler::new(
643            streaming_job_id,
644            &schedulable_workers,
645            default_parallelism,
646            expected_vnode_count,
647        )?;
648
649        let distributions = scheduler.schedule(&fragment_graph)?;
650
651        // Fill the vnode count for each internal table, based on schedule result.
652        let mut fragment_graph = fragment_graph;
653        for (id, fragment) in fragment_graph.building_fragments_mut() {
654            let mut error = None;
655            let fragment_vnode_count = distributions[id].vnode_count();
656            visit_tables(fragment, |table, _| {
657                if error.is_some() {
658                    return;
659                }
660                // There are special cases where a hash-distributed fragment contains singleton
661                // internal tables, e.g., the state table of `Source` executors.
662                let vnode_count = if table.is_singleton() {
663                    if fragment_vnode_count > 1 {
664                        tracing::info!(
665                            table.name,
666                            "found singleton table in hash-distributed fragment"
667                        );
668                    }
669                    1
670                } else {
671                    fragment_vnode_count
672                };
673                match table.vnode_count_inner().value_opt() {
674                    // Vnode count of this table is not set to placeholder, meaning that we are replacing
675                    // a streaming job, and the existing state table requires a specific vnode count.
676                    // Check if it's the same with what we derived from the schedule result.
677                    //
678                    // Typically, inconsistency should not happen as we force to align the vnode count
679                    // when planning the new streaming job in the frontend.
680                    Some(required_vnode_count) if required_vnode_count != vnode_count => {
681                        error = Some(format!(
682                            "failed to align vnode count for table {}({}): required {}, but got {}",
683                            table.id, table.name, required_vnode_count, vnode_count
684                        ));
685                    }
686                    // Normal cases.
687                    _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
688                }
689            });
690            if let Some(error) = error {
691                bail!(error);
692            }
693        }
694
695        Ok(Self {
696            distributions,
697            existing_distributions,
698            fragment_graph,
699            cluster_info,
700        })
701    }
702
703    /// Get the distribution of the given fragment. Will look up the distribution map of both the
704    /// building and existing fragments.
705    fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
706        self.distributions
707            .get(&fragment_id)
708            .or_else(|| self.existing_distributions.get(&fragment_id))
709            .unwrap()
710    }
711
712    /// Convert the actor location map to the [`Locations`] struct.
713    fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
714        let actor_locations = actor_locations
715            .into_iter()
716            .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
717            .collect();
718
719        let worker_locations = self
720            .cluster_info
721            .worker_nodes
722            .iter()
723            .map(|(id, node)| (*id as WorkerId, node.clone()))
724            .collect();
725
726        Locations {
727            actor_locations,
728            worker_locations,
729        }
730    }
731
732    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
733    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
734    pub fn generate_graph(
735        self,
736        env: &MetaSrvEnv,
737        job: &StreamingJob,
738        expr_context: ExprContext,
739    ) -> MetaResult<ActorGraphBuildResult> {
740        // Pre-generate IDs for all actors.
741        let actor_len = self
742            .distributions
743            .values()
744            .map(|d| d.parallelism())
745            .sum::<usize>() as u64;
746        let id_gen = GlobalActorIdGen::new(env.id_gen_manager(), actor_len);
747
748        // Build the actor graph and get the final state.
749        let ActorGraphBuildStateInner {
750            fragment_actor_builders,
751            building_locations,
752            downstream_fragment_changes,
753            upstream_fragment_changes,
754            external_locations,
755        } = self.build_actor_graph(id_gen)?;
756
757        for alignment_id in external_locations.values() {
758            if self
759                .cluster_info
760                .unschedulable_workers
761                .contains(&alignment_id.worker_id())
762            {
763                bail!(
764                    "The worker {} where the associated upstream is located is unschedulable",
765                    alignment_id.worker_id(),
766                );
767            }
768        }
769
770        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
771        let mut new_no_shuffle: FragmentNewNoShuffle = HashMap::new();
772        // Serialize the graph into a map of sealed fragments.
773        let graph = {
774            let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
775                HashMap::new();
776
777            // As all fragments are processed, we can now `build` the actors where the `Exchange`
778            // and `Chain` are rewritten.
779            for (fragment_id, builder) in fragment_actor_builders {
780                let global_fragment_id = fragment_id.as_global_id();
781                let node = builder.rewrite()?;
782                for (upstream_fragment_id, no_shuffle_upstream) in builder.upstreams.into_values() {
783                    if let Some(no_shuffle_upstream) = no_shuffle_upstream {
784                        new_no_shuffle
785                            .entry(upstream_fragment_id.as_global_id())
786                            .or_default()
787                            .try_insert(
788                                global_fragment_id,
789                                no_shuffle_upstream
790                                    .iter()
791                                    .map(|(upstream_actor_id, actor_id)| {
792                                        (upstream_actor_id.as_global_id(), actor_id.as_global_id())
793                                    })
794                                    .collect(),
795                            )
796                            .expect("non-duplicate");
797                    }
798                }
799                downstream_fragment_relations
800                    .try_insert(
801                        global_fragment_id,
802                        builder
803                            .downstreams
804                            .into_iter()
805                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
806                            .collect(),
807                    )
808                    .expect("non-duplicate");
809                fragment_actors
810                    .try_insert(
811                        fragment_id,
812                        (
813                            node,
814                            builder
815                                .actor_builders
816                                .into_values()
817                                .map(|builder| builder.build(job, expr_context.clone()))
818                                .try_collect()?,
819                        ),
820                    )
821                    .expect("non-duplicate");
822            }
823
824            {
825                fragment_actors
826                    .into_iter()
827                    .map(|(fragment_id, (stream_node, actors))| {
828                        let distribution = self.distributions[&fragment_id].clone();
829                        let fragment = self.fragment_graph.seal_fragment(
830                            fragment_id,
831                            actors,
832                            distribution,
833                            stream_node,
834                        );
835                        let fragment_id = fragment_id.as_global_id();
836                        (fragment_id, fragment)
837                    })
838                    .collect()
839            }
840        };
841
842        // Convert the actor location map to the `Locations` struct.
843        let building_locations = self.build_locations(building_locations);
844
845        // Extract the new fragment relation from the external changes.
846        let upstream_fragment_downstreams = upstream_fragment_changes
847            .into_iter()
848            .map(|(fragment_id, changes)| {
849                (
850                    fragment_id.as_global_id(),
851                    changes
852                        .new_downstreams
853                        .into_iter()
854                        .map(|(downstream_fragment_id, new_dispatch)| {
855                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
856                        })
857                        .collect(),
858                )
859            })
860            .collect();
861
862        // Extract the updates for merge executors from the external changes.
863        let replace_upstream = downstream_fragment_changes
864            .into_iter()
865            .map(|(fragment_id, changes)| {
866                let fragment_id = fragment_id.as_global_id();
867                let new_no_shuffle = &mut new_no_shuffle;
868                (
869                    fragment_id,
870                    changes
871                        .new_upstreams
872                        .into_iter()
873                        .map(
874                            move |(edge_id, (upstream_fragment_id, upstream_new_no_shuffle))| {
875                                let upstream_fragment_id = upstream_fragment_id.as_global_id();
876                                if let Some(upstream_new_no_shuffle) = upstream_new_no_shuffle
877                                    && !upstream_new_no_shuffle.is_empty()
878                                {
879                                    let no_shuffle_actors = new_no_shuffle
880                                        .entry(upstream_fragment_id)
881                                        .or_default()
882                                        .entry(fragment_id)
883                                        .or_default();
884                                    no_shuffle_actors.extend(
885                                        upstream_new_no_shuffle.into_iter().map(
886                                            |(upstream_actor_id, actor_id)| {
887                                                (
888                                                    upstream_actor_id.as_global_id(),
889                                                    actor_id.as_global_id(),
890                                                )
891                                            },
892                                        ),
893                                    );
894                                }
895                                let DownstreamExternalEdgeId {
896                                    original_upstream_fragment_id,
897                                    ..
898                                } = edge_id;
899                                (
900                                    original_upstream_fragment_id.as_global_id(),
901                                    upstream_fragment_id,
902                                )
903                            },
904                        )
905                        .collect(),
906                )
907            })
908            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
909            .collect();
910
911        Ok(ActorGraphBuildResult {
912            graph,
913            downstream_fragment_relations,
914            building_locations,
915            upstream_fragment_downstreams,
916            replace_upstream,
917            new_no_shuffle,
918        })
919    }
920
921    /// Build actor graph for each fragment, using topological order.
922    fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
923        let mut state = ActorGraphBuildState::new(id_gen);
924
925        // Use topological sort to build the graph from downstream to upstream. (The first fragment
926        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
927        for fragment_id in self.fragment_graph.topo_order()? {
928            self.build_actor_graph_fragment(fragment_id, &mut state)?;
929        }
930
931        Ok(state.finish())
932    }
933
934    /// Build actor graph for a specific fragment.
935    fn build_actor_graph_fragment(
936        &self,
937        fragment_id: GlobalFragmentId,
938        state: &mut ActorGraphBuildState,
939    ) -> MetaResult<()> {
940        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
941        let distribution = self.get_distribution(fragment_id);
942
943        // First, add or record the actors for the current fragment into the state.
944        let actor_ids = match current_fragment {
945            // For building fragments, we need to generate the actor builders.
946            EitherFragment::Building(current_fragment) => {
947                let node = current_fragment.node.clone().unwrap();
948                state
949                    .inner
950                    .fragment_actor_builders
951                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
952                    .expect("non-duplicate");
953                let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
954
955                distribution
956                    .actors()
957                    .map(|alignment_id| {
958                        let actor_id = state.next_actor_id();
959                        let vnode_bitmap = bitmaps
960                            .as_ref()
961                            .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
962                            .cloned();
963
964                        state
965                            .inner
966                            .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
967
968                        actor_id
969                    })
970                    .collect_vec()
971            }
972
973            // For existing fragments, we only need to record the actor locations.
974            EitherFragment::Existing(existing_fragment) => existing_fragment
975                .actors
976                .iter()
977                .map(|a| {
978                    let actor_id = GlobalActorId::new(a.actor_id);
979                    let alignment_id = match &distribution {
980                        Distribution::Singleton(worker_id) => {
981                            ActorAlignmentId::new_single(*worker_id as u32)
982                        }
983                        Distribution::Hash(mapping) => mapping
984                            .get_matched(a.vnode_bitmap.as_ref().unwrap())
985                            .unwrap(),
986                    };
987
988                    state.inner.record_external_location(actor_id, alignment_id);
989
990                    actor_id
991                })
992                .collect_vec(),
993        };
994
995        // Then, add links between the current fragment and its downstream fragments.
996        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
997            let downstream_actors = state
998                .fragment_actors
999                .get(&downstream_fragment_id)
1000                .expect("downstream fragment not processed yet");
1001
1002            state.inner.add_link(
1003                FragmentLinkNode {
1004                    fragment_id,
1005                    actor_ids: &actor_ids,
1006                },
1007                FragmentLinkNode {
1008                    fragment_id: downstream_fragment_id,
1009                    actor_ids: downstream_actors,
1010                },
1011                edge,
1012            );
1013        }
1014
1015        // Finally, record the actor IDs for the current fragment.
1016        state
1017            .fragment_actors
1018            .try_insert(fragment_id, actor_ids)
1019            .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
1020
1021        Ok(())
1022    }
1023}