risingwave_meta/stream/stream_graph/
actor.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::num::NonZeroUsize;
17
18use assert_matches::assert_matches;
19use itertools::Itertools;
20use risingwave_common::bail;
21use risingwave_common::bitmap::Bitmap;
22use risingwave_common::hash::{ActorAlignmentId, IsSingleton, VnodeCount, VnodeCountCompat};
23use risingwave_common::id::JobId;
24use risingwave_common::util::iter_util::ZipEqFast;
25use risingwave_common::util::stream_graph_visitor::visit_tables;
26use risingwave_pb::stream_plan::stream_node::NodeBody;
27use risingwave_pb::stream_plan::{
28    DispatchStrategy, DispatcherType, MergeNode, StreamNode, StreamScanType,
29};
30
31use super::Locations;
32use crate::MetaResult;
33use crate::controller::cluster::StreamingClusterInfo;
34use crate::manager::{MetaSrvEnv, StreamingJob};
35use crate::model::{
36    Fragment, FragmentDownstreamRelation, FragmentId, FragmentReplaceUpstream, StreamActor,
37    StreamContext,
38};
39use crate::stream::stream_graph::fragment::{
40    CompleteStreamFragmentGraph, DownstreamExternalEdgeId, EdgeId, EitherFragment,
41    StreamFragmentEdge,
42};
43use crate::stream::stream_graph::id::{GlobalActorId, GlobalActorIdGen, GlobalFragmentId};
44use crate::stream::stream_graph::schedule;
45use crate::stream::stream_graph::schedule::Distribution;
46
47/// [`ActorBuilder`] builds a stream actor in a stream DAG.
48#[derive(Debug)]
49struct ActorBuilder {
50    /// The ID of this actor.
51    actor_id: GlobalActorId,
52
53    /// The fragment ID of this actor.
54    fragment_id: GlobalFragmentId,
55
56    /// The virtual node bitmap, if this fragment is hash distributed.
57    vnode_bitmap: Option<Bitmap>,
58}
59
60impl ActorBuilder {
61    fn new(
62        actor_id: GlobalActorId,
63        fragment_id: GlobalFragmentId,
64        vnode_bitmap: Option<Bitmap>,
65    ) -> Self {
66        Self {
67            actor_id,
68            fragment_id,
69            vnode_bitmap,
70        }
71    }
72}
73
74impl FragmentActorBuilder {
75    /// Rewrite the actor body.
76    ///
77    /// During this process, the following things will be done:
78    /// 1. Replace the logical `Exchange` in node's input with `Merge`, which can be executed on the
79    ///    compute nodes.
80    fn rewrite(&self) -> MetaResult<StreamNode> {
81        self.rewrite_inner(&self.node, 0)
82    }
83
84    fn rewrite_inner(&self, stream_node: &StreamNode, depth: usize) -> MetaResult<StreamNode> {
85        match stream_node.get_node_body()? {
86            // Leaf node `Exchange`.
87            NodeBody::Exchange(exchange) => {
88                // The exchange node should always be the bottom of the plan node. If we find one
89                // when the depth is 0, it means that the plan node is not well-formed.
90                if depth == 0 {
91                    bail!(
92                        "there should be no ExchangeNode on the top of the plan node: {:#?}",
93                        stream_node
94                    )
95                }
96                assert!(!stream_node.get_fields().is_empty());
97                assert!(stream_node.input.is_empty());
98
99                // Index the upstreams by the an internal edge ID.
100                let (upstream_fragment_id, _) = &self.upstreams[&EdgeId::Internal {
101                    link_id: stream_node.get_operator_id().as_raw_id(),
102                }];
103
104                let upstream_fragment_id = upstream_fragment_id.as_global_id();
105
106                Ok(StreamNode {
107                    node_body: Some(NodeBody::Merge(Box::new({
108                        MergeNode {
109                            upstream_fragment_id,
110                            upstream_dispatcher_type: exchange.get_strategy()?.r#type,
111                            ..Default::default()
112                        }
113                    }))),
114                    identity: "MergeExecutor".to_owned(),
115                    ..stream_node.clone()
116                })
117            }
118
119            // "Leaf" node `StreamScan`.
120            NodeBody::StreamScan(stream_scan) => {
121                let input = stream_node.get_input();
122                if stream_scan.stream_scan_type() == StreamScanType::CrossDbSnapshotBackfill {
123                    // CrossDbSnapshotBackfill is a special case, which doesn't have any upstream actor
124                    // and always reads from log store.
125                    assert!(input.is_empty());
126                    return Ok(stream_node.clone());
127                }
128                assert_eq!(input.len(), 2);
129
130                let merge_node = &input[0];
131                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
132                let batch_plan_node = &input[1];
133                assert_matches!(batch_plan_node.node_body, Some(NodeBody::BatchPlan(_)));
134
135                // Index the upstreams by the an external edge ID.
136                let (upstream_fragment_id, upstream_no_shuffle_actor) = &self.upstreams
137                    [&EdgeId::UpstreamExternal {
138                        upstream_job_id: stream_scan.table_id.as_job_id(),
139                        downstream_fragment_id: self.fragment_id,
140                    }];
141
142                let is_shuffled_backfill = stream_scan.stream_scan_type
143                    == StreamScanType::ArrangementBackfill as i32
144                    || stream_scan.stream_scan_type == StreamScanType::SnapshotBackfill as i32;
145                if !is_shuffled_backfill {
146                    assert!(*upstream_no_shuffle_actor);
147                }
148
149                let upstream_dispatcher_type = if is_shuffled_backfill {
150                    // FIXME(kwannoel): Should the upstream dispatcher type depends on the upstream distribution?
151                    // If singleton, use `Simple` dispatcher, otherwise use `Hash` dispatcher.
152                    DispatcherType::Hash as _
153                } else {
154                    DispatcherType::NoShuffle as _
155                };
156
157                let upstream_fragment_id = upstream_fragment_id.as_global_id();
158
159                let input = vec![
160                    // Fill the merge node body with correct upstream info.
161                    StreamNode {
162                        node_body: Some(NodeBody::Merge(Box::new({
163                            MergeNode {
164                                upstream_fragment_id,
165                                upstream_dispatcher_type,
166                                ..Default::default()
167                            }
168                        }))),
169                        ..merge_node.clone()
170                    },
171                    batch_plan_node.clone(),
172                ];
173
174                Ok(StreamNode {
175                    input,
176                    ..stream_node.clone()
177                })
178            }
179
180            // "Leaf" node `CdcFilter` and `SourceBackfill`. They both `Merge` an upstream `Source`
181            // cdc_filter -> backfill -> mview
182            // source_backfill -> mview
183            NodeBody::CdcFilter(_) | NodeBody::SourceBackfill(_) => {
184                let input = stream_node.get_input();
185                assert_eq!(input.len(), 1);
186
187                let merge_node = &input[0];
188                assert_matches!(merge_node.node_body, Some(NodeBody::Merge(_)));
189
190                let upstream_source_id = match stream_node.get_node_body()? {
191                    NodeBody::CdcFilter(node) => node.upstream_source_id,
192                    NodeBody::SourceBackfill(node) => node.upstream_source_id,
193                    _ => unreachable!(),
194                };
195
196                // Index the upstreams by the an external edge ID.
197                let (upstream_fragment_id, upstream_is_no_shuffle) = &self.upstreams
198                    [&EdgeId::UpstreamExternal {
199                        upstream_job_id: upstream_source_id.as_share_source_job_id(),
200                        downstream_fragment_id: self.fragment_id,
201                    }];
202
203                assert!(
204                    *upstream_is_no_shuffle,
205                    "Upstream Cdc Source should be singleton. \
206                    SourceBackfill is NoShuffle 1-1 correspondence. \
207                    So they both should have only one upstream actor."
208                );
209
210                let upstream_fragment_id = upstream_fragment_id.as_global_id();
211
212                // rewrite the input
213                let input = vec![
214                    // Fill the merge node body with correct upstream info.
215                    StreamNode {
216                        node_body: Some(NodeBody::Merge(Box::new({
217                            MergeNode {
218                                upstream_fragment_id,
219                                upstream_dispatcher_type: DispatcherType::NoShuffle as _,
220                                ..Default::default()
221                            }
222                        }))),
223                        ..merge_node.clone()
224                    },
225                ];
226                Ok(StreamNode {
227                    input,
228                    ..stream_node.clone()
229                })
230            }
231
232            // For other nodes, visit the children recursively.
233            _ => {
234                let mut new_stream_node = stream_node.clone();
235                for (input, new_input) in stream_node
236                    .input
237                    .iter()
238                    .zip_eq_fast(&mut new_stream_node.input)
239                {
240                    *new_input = self.rewrite_inner(input, depth + 1)?;
241                }
242                Ok(new_stream_node)
243            }
244        }
245    }
246}
247
248impl ActorBuilder {
249    /// Build an actor after all the upstreams and downstreams are processed.
250    fn build(self, job: &StreamingJob, ctx: &StreamContext) -> MetaResult<StreamActor> {
251        // Only fill the definition when debug assertions enabled, otherwise use name instead.
252        #[cfg(not(debug_assertions))]
253        let mview_definition = job.name();
254        #[cfg(debug_assertions)]
255        let mview_definition = job.definition();
256
257        Ok(StreamActor {
258            actor_id: self.actor_id.as_global_id(),
259            fragment_id: self.fragment_id.as_global_id(),
260            vnode_bitmap: self.vnode_bitmap,
261            mview_definition,
262            expr_context: Some(ctx.to_expr_context()),
263            config_override: ctx.config_override.clone(),
264        })
265    }
266}
267
268/// The required changes to an existing external actor to build the graph of a streaming job.
269///
270/// For example, when we're creating an mview on an existing mview, we need to add new downstreams
271/// to the upstream actors, by adding new dispatchers.
272#[derive(Default)]
273struct UpstreamFragmentChange {
274    /// The new downstreams to be added.
275    new_downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
276}
277
278#[derive(Default)]
279struct DownstreamFragmentChange {
280    /// The new upstreams to be added (replaced), indexed by the edge id to upstream fragment.
281    /// `edge_id` -> new upstream fragment id
282    new_upstreams: HashMap<DownstreamExternalEdgeId, GlobalFragmentId>,
283}
284
285impl UpstreamFragmentChange {
286    /// Add a dispatcher to the external actor.
287    fn add_dispatcher(
288        &mut self,
289        downstream_fragment_id: GlobalFragmentId,
290        dispatch: DispatchStrategy,
291    ) {
292        self.new_downstreams
293            .try_insert(downstream_fragment_id, dispatch)
294            .unwrap();
295    }
296}
297
298impl DownstreamFragmentChange {
299    /// Add an upstream to the external actor.
300    fn add_upstream(
301        &mut self,
302        edge_id: DownstreamExternalEdgeId,
303        new_upstream_fragment_id: GlobalFragmentId,
304    ) {
305        self.new_upstreams
306            .try_insert(edge_id, new_upstream_fragment_id)
307            .unwrap();
308    }
309}
310
311/// The location of actors.
312type ActorLocations = BTreeMap<GlobalActorId, ActorAlignmentId>;
313
314#[derive(Debug)]
315struct FragmentActorBuilder {
316    fragment_id: GlobalFragmentId,
317    node: StreamNode,
318    actor_builders: BTreeMap<GlobalActorId, ActorBuilder>,
319    downstreams: HashMap<GlobalFragmentId, DispatchStrategy>,
320    // edge_id -> (upstream fragment_id, whether the edge is no shuffle)
321    upstreams: HashMap<EdgeId, (GlobalFragmentId, bool)>,
322}
323
324impl FragmentActorBuilder {
325    fn new(fragment_id: GlobalFragmentId, node: StreamNode) -> Self {
326        Self {
327            fragment_id,
328            node,
329            actor_builders: Default::default(),
330            downstreams: Default::default(),
331            upstreams: Default::default(),
332        }
333    }
334}
335
336/// The actual mutable state of building an actor graph.
337///
338/// When the fragments are visited in a topological order, actor builders will be added to this
339/// state and the scheduled locations will be added. As the building process is run on the
340/// **complete graph** which also contains the info of the existing (external) fragments, the info
341/// of them will be also recorded.
342#[derive(Default)]
343struct ActorGraphBuildStateInner {
344    /// The builders of the actors to be built.
345    fragment_actor_builders: BTreeMap<GlobalFragmentId, FragmentActorBuilder>,
346
347    /// The scheduled locations of the actors to be built.
348    building_locations: ActorLocations,
349
350    /// The required changes to the external downstream fragment. See [`DownstreamFragmentChange`].
351    /// Indexed by the `fragment_id` of fragments that have updates on its downstream.
352    downstream_fragment_changes: BTreeMap<GlobalFragmentId, DownstreamFragmentChange>,
353
354    /// The required changes to the external upstream fragment. See [`UpstreamFragmentChange`].
355    /// /// Indexed by the `fragment_id` of fragments that have updates on its upstream.
356    upstream_fragment_changes: BTreeMap<GlobalFragmentId, UpstreamFragmentChange>,
357
358    /// The actual locations of the external actors.
359    external_locations: ActorLocations,
360}
361
362/// The information of a fragment, used for parameter passing for `Inner::add_link`.
363struct FragmentLinkNode<'a> {
364    fragment_id: GlobalFragmentId,
365    actor_ids: &'a [GlobalActorId],
366}
367
368impl ActorGraphBuildStateInner {
369    /// Insert new generated actor and record its location.
370    ///
371    /// The `vnode_bitmap` should be `Some` for the actors of hash-distributed fragments.
372    fn add_actor(
373        &mut self,
374        (fragment_id, actor_id): (GlobalFragmentId, GlobalActorId),
375        actor_alignment_id: ActorAlignmentId,
376        vnode_bitmap: Option<Bitmap>,
377    ) {
378        self.fragment_actor_builders
379            .get_mut(&fragment_id)
380            .expect("should added previously")
381            .actor_builders
382            .try_insert(
383                actor_id,
384                ActorBuilder::new(actor_id, fragment_id, vnode_bitmap),
385            )
386            .unwrap();
387
388        self.building_locations
389            .try_insert(actor_id, actor_alignment_id)
390            .unwrap();
391    }
392
393    /// Record the location of an external actor.
394    fn record_external_location(
395        &mut self,
396        actor_id: GlobalActorId,
397        actor_alignment_id: ActorAlignmentId,
398    ) {
399        self.external_locations
400            .try_insert(actor_id, actor_alignment_id)
401            .unwrap();
402    }
403
404    /// Add the new downstream fragment relation to a fragment.
405    ///
406    /// - If the fragment is to be built, the fragment relation will be added to the fragment actor builder.
407    /// - If the fragment is an external existing fragment, the fragment relation will be added to the external changes.
408    fn add_dispatcher(
409        &mut self,
410        fragment_id: GlobalFragmentId,
411        downstream_fragment_id: GlobalFragmentId,
412        dispatch: DispatchStrategy,
413    ) {
414        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
415            builder
416                .downstreams
417                .try_insert(downstream_fragment_id, dispatch)
418                .unwrap();
419        } else {
420            self.upstream_fragment_changes
421                .entry(fragment_id)
422                .or_default()
423                .add_dispatcher(downstream_fragment_id, dispatch);
424        }
425    }
426
427    /// Add the new upstream for an actor.
428    ///
429    /// - If the actor is to be built, the upstream will be added to the actor builder.
430    /// - If the actor is an external actor, the upstream will be added to the external changes.
431    fn add_upstream(
432        &mut self,
433        fragment_id: GlobalFragmentId,
434        edge_id: EdgeId,
435        upstream_fragment_id: GlobalFragmentId,
436        is_no_shuffle: bool,
437    ) {
438        if let Some(builder) = self.fragment_actor_builders.get_mut(&fragment_id) {
439            builder
440                .upstreams
441                .try_insert(edge_id, (upstream_fragment_id, is_no_shuffle))
442                .unwrap();
443        } else {
444            let EdgeId::DownstreamExternal(edge_id) = edge_id else {
445                unreachable!("edge from internal to external must be `DownstreamExternal`")
446            };
447            self.downstream_fragment_changes
448                .entry(fragment_id)
449                .or_default()
450                .add_upstream(edge_id, upstream_fragment_id);
451        }
452    }
453
454    /// Add a "link" between two fragments in the graph.
455    ///
456    /// The `edge` will be transformed into the fragment relation (downstream - upstream) pair between two fragments,
457    /// based on the distribution and the dispatch strategy. They will be
458    /// finally transformed to `Dispatcher` and `Merge` nodes when building the actors.
459    ///
460    /// If there're existing (external) fragments, the info will be recorded in `upstream_fragment_changes` and `downstream_fragment_changes`,
461    /// instead of the actor builders.
462    fn add_link<'a>(
463        &mut self,
464        upstream: FragmentLinkNode<'a>,
465        downstream: FragmentLinkNode<'a>,
466        edge: &'a StreamFragmentEdge,
467    ) {
468        let dt = edge.dispatch_strategy.r#type();
469
470        match dt {
471            // For `NoShuffle`, make n "1-1" links between the actors.
472            DispatcherType::NoShuffle => {
473                assert_eq!(upstream.actor_ids.len(), downstream.actor_ids.len());
474
475                // Create a new dispatcher just between these two actors.
476                self.add_dispatcher(
477                    upstream.fragment_id,
478                    downstream.fragment_id,
479                    edge.dispatch_strategy.clone(),
480                );
481
482                // Also record the upstream for the downstream actor.
483                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, true);
484            }
485
486            // Otherwise, make m * n links between the actors.
487            DispatcherType::Hash | DispatcherType::Broadcast | DispatcherType::Simple => {
488                self.add_dispatcher(
489                    upstream.fragment_id,
490                    downstream.fragment_id,
491                    edge.dispatch_strategy.clone(),
492                );
493                self.add_upstream(downstream.fragment_id, edge.id, upstream.fragment_id, false);
494            }
495
496            DispatcherType::Unspecified => unreachable!(),
497        }
498    }
499}
500
501/// The mutable state of building an actor graph. See [`ActorGraphBuildStateInner`].
502struct ActorGraphBuildState {
503    /// The actual state.
504    inner: ActorGraphBuildStateInner,
505
506    /// The actor IDs of each fragment.
507    fragment_actors: HashMap<GlobalFragmentId, Vec<GlobalActorId>>,
508
509    /// The next local actor id to use.
510    next_local_id: u32,
511
512    /// The global actor id generator.
513    actor_id_gen: GlobalActorIdGen,
514}
515
516impl ActorGraphBuildState {
517    /// Create an empty state with the given id generator.
518    fn new(actor_id_gen: GlobalActorIdGen) -> Self {
519        Self {
520            inner: Default::default(),
521            fragment_actors: Default::default(),
522            next_local_id: 0,
523            actor_id_gen,
524        }
525    }
526
527    /// Get the next global actor id.
528    fn next_actor_id(&mut self) -> GlobalActorId {
529        let local_id = self.next_local_id;
530        self.next_local_id += 1;
531
532        self.actor_id_gen.to_global_id(local_id)
533    }
534
535    /// Finish the build and return the inner state.
536    fn finish(self) -> ActorGraphBuildStateInner {
537        // Assert that all the actors are built.
538        assert_eq!(self.actor_id_gen.len(), self.next_local_id);
539
540        self.inner
541    }
542}
543
544/// The result of a built actor graph. Will be further embedded into the `Context` for building
545/// actors on the compute nodes.
546pub struct ActorGraphBuildResult {
547    /// The graph of sealed fragments, including all actors.
548    pub graph: BTreeMap<FragmentId, Fragment>,
549    /// The downstream fragments of the fragments from the new graph to be created.
550    /// Including the fragment relation to external downstream fragment.
551    pub downstream_fragment_relations: FragmentDownstreamRelation,
552
553    /// The scheduled locations of the actors to be built.
554    pub building_locations: Locations,
555
556    /// The new dispatchers to be added to the upstream mview actors. Used for MV on MV.
557    pub upstream_fragment_downstreams: FragmentDownstreamRelation,
558
559    /// The updates to be applied to the downstream fragment merge node. Used for schema change (replace
560    /// table plan).
561    pub replace_upstream: FragmentReplaceUpstream,
562}
563
564/// [`ActorGraphBuilder`] builds the actor graph for the given complete fragment graph, based on the
565/// current cluster info and the required parallelism.
566#[derive(Debug)]
567pub struct ActorGraphBuilder {
568    /// The pre-scheduled distribution for each building fragment.
569    distributions: HashMap<GlobalFragmentId, Distribution>,
570
571    /// The actual distribution for each existing fragment.
572    existing_distributions: HashMap<GlobalFragmentId, Distribution>,
573
574    /// The complete fragment graph.
575    fragment_graph: CompleteStreamFragmentGraph,
576
577    /// The cluster info for creating a streaming job.
578    cluster_info: StreamingClusterInfo,
579}
580
581impl ActorGraphBuilder {
582    /// Create a new actor graph builder with the given "complete" graph. Returns an error if the
583    /// graph is failed to be scheduled.
584    pub fn new(
585        streaming_job_id: JobId,
586        resource_group: String,
587        fragment_graph: CompleteStreamFragmentGraph,
588        cluster_info: StreamingClusterInfo,
589        default_parallelism: NonZeroUsize,
590    ) -> MetaResult<Self> {
591        let expected_vnode_count = fragment_graph.max_parallelism();
592        let existing_distributions = fragment_graph.existing_distribution();
593
594        let schedulable_workers = cluster_info.filter_workers_by_resource_group(&resource_group);
595
596        let scheduler = schedule::Scheduler::new(
597            streaming_job_id,
598            &schedulable_workers,
599            default_parallelism,
600            expected_vnode_count,
601        )?;
602
603        let distributions = scheduler.schedule(&fragment_graph)?;
604
605        // Fill the vnode count for each internal table, based on schedule result.
606        let mut fragment_graph = fragment_graph;
607        for (id, fragment) in fragment_graph.building_fragments_mut() {
608            let mut error = None;
609            let fragment_vnode_count = distributions[id].vnode_count();
610            visit_tables(fragment, |table, _| {
611                if error.is_some() {
612                    return;
613                }
614                // There are special cases where a hash-distributed fragment contains singleton
615                // internal tables, e.g., the state table of `Source` executors.
616                let vnode_count = if table.is_singleton() {
617                    if fragment_vnode_count > 1 {
618                        tracing::info!(
619                            table.name,
620                            "found singleton table in hash-distributed fragment"
621                        );
622                    }
623                    1
624                } else {
625                    fragment_vnode_count
626                };
627                match table.vnode_count_inner().value_opt() {
628                    // Vnode count of this table is not set to placeholder, meaning that we are replacing
629                    // a streaming job, and the existing state table requires a specific vnode count.
630                    // Check if it's the same with what we derived from the schedule result.
631                    //
632                    // Typically, inconsistency should not happen as we force to align the vnode count
633                    // when planning the new streaming job in the frontend.
634                    Some(required_vnode_count) if required_vnode_count != vnode_count => {
635                        error = Some(format!(
636                            "failed to align vnode count for table {}({}): required {}, but got {}",
637                            table.id, table.name, required_vnode_count, vnode_count
638                        ));
639                    }
640                    // Normal cases.
641                    _ => table.maybe_vnode_count = VnodeCount::set(vnode_count).to_protobuf(),
642                }
643            });
644            if let Some(error) = error {
645                bail!(error);
646            }
647        }
648
649        Ok(Self {
650            distributions,
651            existing_distributions,
652            fragment_graph,
653            cluster_info,
654        })
655    }
656
657    /// Get the distribution of the given fragment. Will look up the distribution map of both the
658    /// building and existing fragments.
659    fn get_distribution(&self, fragment_id: GlobalFragmentId) -> &Distribution {
660        self.distributions
661            .get(&fragment_id)
662            .or_else(|| self.existing_distributions.get(&fragment_id))
663            .unwrap()
664    }
665
666    /// Convert the actor location map to the [`Locations`] struct.
667    fn build_locations(&self, actor_locations: ActorLocations) -> Locations {
668        let actor_locations = actor_locations
669            .into_iter()
670            .map(|(id, alignment_id)| (id.as_global_id(), alignment_id))
671            .collect();
672
673        let worker_locations = self
674            .cluster_info
675            .worker_nodes
676            .iter()
677            .map(|(id, node)| (*id, node.clone()))
678            .collect();
679
680        Locations {
681            actor_locations,
682            worker_locations,
683        }
684    }
685
686    /// Build a stream graph by duplicating each fragment as parallel actors. Returns
687    /// [`ActorGraphBuildResult`] that will be further used to build actors on the compute nodes.
688    pub fn generate_graph(
689        self,
690        env: &MetaSrvEnv,
691        job: &StreamingJob,
692        ctx: StreamContext,
693    ) -> MetaResult<ActorGraphBuildResult> {
694        // Pre-generate IDs for all actors.
695        let actor_len = self
696            .distributions
697            .values()
698            .map(|d| d.parallelism())
699            .sum::<usize>() as u64;
700        let id_gen = GlobalActorIdGen::new(env.actor_id_generator(), actor_len);
701
702        // Build the actor graph and get the final state.
703        let ActorGraphBuildStateInner {
704            fragment_actor_builders,
705            building_locations,
706            downstream_fragment_changes,
707            upstream_fragment_changes,
708            ..
709        } = self.build_actor_graph(id_gen)?;
710
711        let mut downstream_fragment_relations: FragmentDownstreamRelation = HashMap::new();
712        // Serialize the graph into a map of sealed fragments.
713        let graph = {
714            let mut fragment_actors: HashMap<GlobalFragmentId, (StreamNode, Vec<StreamActor>)> =
715                HashMap::new();
716
717            // As all fragments are processed, we can now `build` the actors where the `Exchange`
718            // and `Chain` are rewritten.
719            for (fragment_id, builder) in fragment_actor_builders {
720                let global_fragment_id = fragment_id.as_global_id();
721                let node = builder.rewrite()?;
722                downstream_fragment_relations
723                    .try_insert(
724                        global_fragment_id,
725                        builder
726                            .downstreams
727                            .into_iter()
728                            .map(|(id, dispatch)| (id.as_global_id(), dispatch).into())
729                            .collect(),
730                    )
731                    .expect("non-duplicate");
732                fragment_actors
733                    .try_insert(
734                        fragment_id,
735                        (
736                            node,
737                            builder
738                                .actor_builders
739                                .into_values()
740                                .map(|builder| builder.build(job, &ctx))
741                                .try_collect()?,
742                        ),
743                    )
744                    .expect("non-duplicate");
745            }
746
747            {
748                fragment_actors
749                    .into_iter()
750                    .map(|(fragment_id, (stream_node, actors))| {
751                        let distribution = self.distributions[&fragment_id].clone();
752                        let fragment = self.fragment_graph.seal_fragment(
753                            fragment_id,
754                            actors,
755                            distribution,
756                            stream_node,
757                        );
758                        let fragment_id = fragment_id.as_global_id();
759                        (fragment_id, fragment)
760                    })
761                    .collect()
762            }
763        };
764
765        // Convert the actor location map to the `Locations` struct.
766        let building_locations = self.build_locations(building_locations);
767
768        // Extract the new fragment relation from the external changes.
769        let upstream_fragment_downstreams = upstream_fragment_changes
770            .into_iter()
771            .map(|(fragment_id, changes)| {
772                (
773                    fragment_id.as_global_id(),
774                    changes
775                        .new_downstreams
776                        .into_iter()
777                        .map(|(downstream_fragment_id, new_dispatch)| {
778                            (downstream_fragment_id.as_global_id(), new_dispatch).into()
779                        })
780                        .collect(),
781                )
782            })
783            .collect();
784
785        // Extract the updates for merge executors from the external changes.
786        let replace_upstream = downstream_fragment_changes
787            .into_iter()
788            .map(|(fragment_id, changes)| {
789                let fragment_id = fragment_id.as_global_id();
790                (
791                    fragment_id,
792                    changes
793                        .new_upstreams
794                        .into_iter()
795                        .map(move |(edge_id, upstream_fragment_id)| {
796                            let upstream_fragment_id = upstream_fragment_id.as_global_id();
797                            let DownstreamExternalEdgeId {
798                                original_upstream_fragment_id,
799                                ..
800                            } = edge_id;
801                            (
802                                original_upstream_fragment_id.as_global_id(),
803                                upstream_fragment_id,
804                            )
805                        })
806                        .collect(),
807                )
808            })
809            .filter(|(_, fragment_changes): &(_, HashMap<_, _>)| !fragment_changes.is_empty())
810            .collect();
811
812        Ok(ActorGraphBuildResult {
813            graph,
814            downstream_fragment_relations,
815            building_locations,
816            upstream_fragment_downstreams,
817            replace_upstream,
818        })
819    }
820
821    /// Build actor graph for each fragment, using topological order.
822    fn build_actor_graph(&self, id_gen: GlobalActorIdGen) -> MetaResult<ActorGraphBuildStateInner> {
823        let mut state = ActorGraphBuildState::new(id_gen);
824
825        // Use topological sort to build the graph from downstream to upstream. (The first fragment
826        // popped out from the heap will be the top-most node in plan, or the sink in stream graph.)
827        for fragment_id in self.fragment_graph.topo_order()? {
828            self.build_actor_graph_fragment(fragment_id, &mut state)?;
829        }
830
831        Ok(state.finish())
832    }
833
834    /// Build actor graph for a specific fragment.
835    fn build_actor_graph_fragment(
836        &self,
837        fragment_id: GlobalFragmentId,
838        state: &mut ActorGraphBuildState,
839    ) -> MetaResult<()> {
840        let current_fragment = self.fragment_graph.get_fragment(fragment_id);
841        let distribution = self.get_distribution(fragment_id);
842
843        // First, add or record the actors for the current fragment into the state.
844        let actor_ids = match current_fragment {
845            // For building fragments, we need to generate the actor builders.
846            EitherFragment::Building(current_fragment) => {
847                let node = current_fragment.node.clone().unwrap();
848                state
849                    .inner
850                    .fragment_actor_builders
851                    .try_insert(fragment_id, FragmentActorBuilder::new(fragment_id, node))
852                    .expect("non-duplicate");
853                let bitmaps = distribution.as_hash().map(|m| m.to_bitmaps());
854
855                distribution
856                    .actors()
857                    .map(|alignment_id| {
858                        let actor_id = state.next_actor_id();
859                        let vnode_bitmap = bitmaps
860                            .as_ref()
861                            .map(|m: &HashMap<ActorAlignmentId, Bitmap>| &m[&alignment_id])
862                            .cloned();
863
864                        state
865                            .inner
866                            .add_actor((fragment_id, actor_id), alignment_id, vnode_bitmap);
867
868                        actor_id
869                    })
870                    .collect_vec()
871            }
872
873            // For existing fragments, we only need to record the actor locations.
874            EitherFragment::Existing(existing_fragment) => existing_fragment
875                .actors
876                .iter()
877                .map(|(actor_id, actor_info)| {
878                    let actor_id = GlobalActorId::new(*actor_id);
879                    let alignment_id = match &distribution {
880                        Distribution::Singleton(worker_id) => {
881                            ActorAlignmentId::new_single(*worker_id)
882                        }
883                        Distribution::Hash(mapping) => mapping
884                            .get_matched(actor_info.vnode_bitmap.as_ref().unwrap())
885                            .unwrap(),
886                    };
887
888                    state.inner.record_external_location(actor_id, alignment_id);
889
890                    actor_id
891                })
892                .collect_vec(),
893        };
894
895        // Then, add links between the current fragment and its downstream fragments.
896        for (downstream_fragment_id, edge) in self.fragment_graph.get_downstreams(fragment_id) {
897            let downstream_actors = state
898                .fragment_actors
899                .get(&downstream_fragment_id)
900                .expect("downstream fragment not processed yet");
901
902            state.inner.add_link(
903                FragmentLinkNode {
904                    fragment_id,
905                    actor_ids: &actor_ids,
906                },
907                FragmentLinkNode {
908                    fragment_id: downstream_fragment_id,
909                    actor_ids: downstream_actors,
910                },
911                edge,
912            );
913        }
914
915        // Finally, record the actor IDs for the current fragment.
916        state
917            .fragment_actors
918            .try_insert(fragment_id, actor_ids)
919            .unwrap_or_else(|_| panic!("fragment {:?} is already processed", fragment_id));
920
921        Ok(())
922    }
923}