risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::stream_plan::StreamNode;
22use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
23use tracing::warn;
24
25use crate::barrier::rpc::ControlStreamManager;
26use crate::controller::fragment::InflightFragmentInfo;
27use crate::controller::utils::compose_dispatchers;
28use crate::model::{
29    ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
30    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
31};
32
33#[derive(Debug)]
34struct FragmentInfo {
35    distribution_type: DistributionType,
36    actors: HashMap<ActorId, Option<Bitmap>>,
37    actor_location: HashMap<ActorId, HostAddress>,
38}
39
40pub(super) struct FragmentEdgeBuildResult {
41    pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
42    pub(super) dispatchers: FragmentActorDispatchers,
43    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
44}
45
46impl FragmentEdgeBuildResult {
47    pub(super) fn collect_actors_to_create(
48        &mut self,
49        actors: impl Iterator<
50            Item = (
51                FragmentId,
52                &StreamNode,
53                impl Iterator<Item = (&StreamActor, WorkerId)>,
54                impl IntoIterator<Item = u32>,
55            ),
56        >,
57    ) -> StreamJobActorsToCreate {
58        let mut actors_to_create = StreamJobActorsToCreate::default();
59        for (fragment_id, node, actors, subscriber_ids) in actors {
60            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
61            for (actor, worker_id) in actors {
62                let upstreams = self
63                    .upstreams
64                    .get_mut(&fragment_id)
65                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
66                    .unwrap_or_default();
67                let dispatchers = self
68                    .dispatchers
69                    .get_mut(&fragment_id)
70                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
71                    .unwrap_or_default();
72                actors_to_create
73                    .entry(worker_id)
74                    .or_default()
75                    .entry(fragment_id)
76                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
77                    .1
78                    .push((actor.clone(), upstreams, dispatchers))
79            }
80        }
81        actors_to_create
82    }
83}
84
85pub(super) struct FragmentEdgeBuilder {
86    fragments: HashMap<FragmentId, FragmentInfo>,
87    result: FragmentEdgeBuildResult,
88}
89
90impl FragmentEdgeBuilder {
91    pub(super) fn new(
92        fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
93        control_stream_manager: &ControlStreamManager,
94    ) -> Self {
95        let mut fragments = HashMap::new();
96        for info in fragment_infos {
97            fragments
98                .try_insert(info.fragment_id, {
99                    let (actors, actor_location) = info
100                        .actors
101                        .iter()
102                        .map(|(actor_id, actor)| {
103                            (
104                                (*actor_id, actor.vnode_bitmap.clone()),
105                                (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
106                            )
107                        })
108                        .unzip();
109                    FragmentInfo {
110                        distribution_type: info.distribution_type,
111                        actors,
112                        actor_location,
113                    }
114                })
115                .expect("non-duplicate");
116        }
117        Self {
118            fragments,
119            result: FragmentEdgeBuildResult {
120                upstreams: Default::default(),
121                dispatchers: Default::default(),
122                merge_updates: Default::default(),
123            },
124        }
125    }
126
127    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
128        for (fragment_id, relations) in relations {
129            for relation in relations {
130                self.add_edge(*fragment_id, relation);
131            }
132        }
133    }
134
135    pub(super) fn add_edge(
136        &mut self,
137        fragment_id: FragmentId,
138        downstream: &DownstreamFragmentRelation,
139    ) {
140        let fragment = &self
141            .fragments
142            .get(&fragment_id)
143            .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
144        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
145        let dispatchers = compose_dispatchers(
146            fragment.distribution_type,
147            &fragment.actors,
148            downstream.downstream_fragment_id,
149            downstream_fragment.distribution_type,
150            &downstream_fragment.actors,
151            downstream.dispatcher_type,
152            downstream.dist_key_indices.clone(),
153            downstream.output_mapping.clone(),
154        );
155        let downstream_fragment_upstreams = self
156            .result
157            .upstreams
158            .entry(downstream.downstream_fragment_id)
159            .or_default();
160        for (actor_id, dispatcher) in dispatchers {
161            let actor_location = &fragment.actor_location[&actor_id];
162            for downstream_actor in &dispatcher.downstream_actor_id {
163                downstream_fragment_upstreams
164                    .entry(*downstream_actor)
165                    .or_default()
166                    .entry(fragment_id)
167                    .or_default()
168                    .insert(
169                        actor_id,
170                        ActorInfo {
171                            actor_id,
172                            host: Some(actor_location.clone()),
173                        },
174                    );
175            }
176            self.result
177                .dispatchers
178                .entry(fragment_id)
179                .or_default()
180                .entry(actor_id)
181                .or_default()
182                .push(dispatcher);
183        }
184    }
185
186    pub(super) fn replace_upstream(
187        &mut self,
188        fragment_id: FragmentId,
189        original_upstream_fragment_id: FragmentId,
190        new_upstream_fragment_id: FragmentId,
191    ) {
192        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
193        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
194            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
195                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
196                    fragment_merge_updates.push(MergeUpdate {
197                        actor_id,
198                        upstream_fragment_id: original_upstream_fragment_id,
199                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
200                        added_upstream_actors: new_upstreams.into_values().collect(),
201                        removed_upstream_actor_id: vec![],
202                    })
203                } else if cfg!(debug_assertions) {
204                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
205                } else {
206                    warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
207                }
208                !actor_upstreams.is_empty()
209            })
210        } else if cfg!(debug_assertions) {
211            panic!(
212                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
213                fragment_id,
214                new_upstream_fragment_id,
215                original_upstream_fragment_id,
216                self.result.upstreams
217            );
218        } else {
219            warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
220        }
221    }
222
223    pub(super) fn build(self) -> FragmentEdgeBuildResult {
224        self.result
225    }
226}