risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::stream_plan::StreamNode;
22use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
23use tracing::warn;
24
25use crate::barrier::rpc::ControlStreamManager;
26use crate::controller::fragment::InflightFragmentInfo;
27use crate::controller::utils::compose_dispatchers;
28use crate::model::{
29    ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
30    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
31};
32
33#[derive(Debug)]
34struct FragmentInfo {
35    distribution_type: DistributionType,
36    actors: HashMap<ActorId, Option<Bitmap>>,
37    actor_location: HashMap<ActorId, HostAddress>,
38}
39
40pub(super) struct FragmentEdgeBuildResult {
41    pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
42    pub(super) dispatchers: FragmentActorDispatchers,
43    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
44}
45
46impl FragmentEdgeBuildResult {
47    pub(super) fn collect_actors_to_create(
48        &mut self,
49        actors: impl Iterator<
50            Item = (
51                FragmentId,
52                &StreamNode,
53                impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
54            ),
55        >,
56    ) -> StreamJobActorsToCreate {
57        let mut actors_to_create = StreamJobActorsToCreate::default();
58        for (fragment_id, node, actors) in actors {
59            for (actor, worker_id) in actors {
60                let upstreams = self
61                    .upstreams
62                    .get_mut(&fragment_id)
63                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
64                    .unwrap_or_default();
65                let dispatchers = self
66                    .dispatchers
67                    .get_mut(&fragment_id)
68                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
69                    .unwrap_or_default();
70                actors_to_create
71                    .entry(worker_id)
72                    .or_default()
73                    .entry(fragment_id)
74                    .or_insert_with(|| (node.clone(), vec![]))
75                    .1
76                    .push((actor.clone(), upstreams, dispatchers))
77            }
78        }
79        actors_to_create
80    }
81}
82
83pub(super) struct FragmentEdgeBuilder {
84    fragments: HashMap<FragmentId, FragmentInfo>,
85    result: FragmentEdgeBuildResult,
86}
87
88impl FragmentEdgeBuilder {
89    pub(super) fn new(
90        fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
91        control_stream_manager: &ControlStreamManager,
92    ) -> Self {
93        let mut fragments = HashMap::new();
94        for info in fragment_infos {
95            fragments
96                .try_insert(info.fragment_id, {
97                    let (actors, actor_location) = info
98                        .actors
99                        .iter()
100                        .map(|(actor_id, actor)| {
101                            (
102                                (*actor_id, actor.vnode_bitmap.clone()),
103                                (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
104                            )
105                        })
106                        .unzip();
107                    FragmentInfo {
108                        distribution_type: info.distribution_type,
109                        actors,
110                        actor_location,
111                    }
112                })
113                .expect("non-duplicate");
114        }
115        Self {
116            fragments,
117            result: FragmentEdgeBuildResult {
118                upstreams: Default::default(),
119                dispatchers: Default::default(),
120                merge_updates: Default::default(),
121            },
122        }
123    }
124
125    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
126        for (fragment_id, relations) in relations {
127            for relation in relations {
128                self.add_edge(*fragment_id, relation);
129            }
130        }
131    }
132
133    fn add_edge(&mut self, fragment_id: FragmentId, downstream: &DownstreamFragmentRelation) {
134        let fragment = &self
135            .fragments
136            .get(&fragment_id)
137            .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
138        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
139        let dispatchers = compose_dispatchers(
140            fragment.distribution_type,
141            &fragment.actors,
142            downstream.downstream_fragment_id,
143            downstream_fragment.distribution_type,
144            &downstream_fragment.actors,
145            downstream.dispatcher_type,
146            downstream.dist_key_indices.clone(),
147            downstream.output_indices.clone(),
148        );
149        let downstream_fragment_upstreams = self
150            .result
151            .upstreams
152            .entry(downstream.downstream_fragment_id)
153            .or_default();
154        for (actor_id, dispatcher) in dispatchers {
155            let actor_location = &fragment.actor_location[&actor_id];
156            for downstream_actor in &dispatcher.downstream_actor_id {
157                downstream_fragment_upstreams
158                    .entry(*downstream_actor)
159                    .or_default()
160                    .entry(fragment_id)
161                    .or_default()
162                    .insert(
163                        actor_id,
164                        ActorInfo {
165                            actor_id,
166                            host: Some(actor_location.clone()),
167                        },
168                    );
169            }
170            self.result
171                .dispatchers
172                .entry(fragment_id)
173                .or_default()
174                .entry(actor_id)
175                .or_default()
176                .push(dispatcher);
177        }
178    }
179
180    pub(super) fn replace_upstream(
181        &mut self,
182        fragment_id: FragmentId,
183        original_upstream_fragment_id: FragmentId,
184        new_upstream_fragment_id: FragmentId,
185    ) {
186        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
187        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
188            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
189                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
190                    fragment_merge_updates.push(MergeUpdate {
191                        actor_id,
192                        upstream_fragment_id: original_upstream_fragment_id,
193                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
194                        added_upstream_actors: new_upstreams.into_values().collect(),
195                        removed_upstream_actor_id: vec![],
196                    })
197                } else if cfg!(debug_assertions) {
198                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
199                } else {
200                    warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
201                }
202                !actor_upstreams.is_empty()
203            })
204        } else if cfg!(debug_assertions) {
205            panic!(
206                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
207                fragment_id,
208                new_upstream_fragment_id,
209                original_upstream_fragment_id,
210                self.result.upstreams
211            );
212        } else {
213            warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
214        }
215    }
216
217    pub(super) fn build(self) -> FragmentEdgeBuildResult {
218        self.result
219    }
220}