risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30    ActorId, ActorNewNoShuffle, ActorUpstreams, DownstreamFragmentRelation, Fragment,
31    FragmentActorDispatchers, FragmentDownstreamRelation, FragmentId, StreamActor,
32    StreamJobActorsToCreate,
33};
34
35/// Fragment information needed by [`FragmentEdgeBuilder`] to compute dispatchers and merge nodes.
36///
37/// Contains actor bitmaps and resolved host addresses.
38#[derive(Debug)]
39pub(super) struct EdgeBuilderFragmentInfo {
40    distribution_type: DistributionType,
41    actors: HashMap<ActorId, Option<Bitmap>>,
42    actor_location: HashMap<ActorId, HostAddress>,
43    partial_graph_id: PartialGraphId,
44}
45
46impl EdgeBuilderFragmentInfo {
47    /// Build from an already-inflight fragment (actors already materialized).
48    pub fn from_inflight(
49        info: &InflightFragmentInfo,
50        partial_graph_id: PartialGraphId,
51        control_stream_manager: &ControlStreamManager,
52    ) -> Self {
53        let (actors, actor_location) = info
54            .actors
55            .iter()
56            .map(|(&actor_id, actor)| {
57                (
58                    (actor_id, actor.vnode_bitmap.clone()),
59                    (actor_id, control_stream_manager.host_addr(actor.worker_id)),
60                )
61            })
62            .unzip();
63        Self {
64            distribution_type: info.distribution_type,
65            actors,
66            actor_location,
67            partial_graph_id,
68        }
69    }
70
71    /// Build from a model `Fragment` with separately provided actors and locations.
72    pub fn from_fragment(
73        fragment: &Fragment,
74        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
75        actor_worker: &HashMap<ActorId, WorkerId>,
76        partial_graph_id: PartialGraphId,
77        control_stream_manager: &ControlStreamManager,
78    ) -> Self {
79        let (actors, actor_location) = stream_actors
80            .get(&fragment.fragment_id)
81            .into_iter()
82            .flatten()
83            .map(|actor| {
84                (
85                    (actor.actor_id, actor.vnode_bitmap.clone()),
86                    (
87                        actor.actor_id,
88                        control_stream_manager.host_addr(actor_worker[&actor.actor_id]),
89                    ),
90                )
91            })
92            .unzip();
93        Self {
94            distribution_type: fragment.distribution_type.into(),
95            actors,
96            actor_location,
97            partial_graph_id,
98        }
99    }
100}
101
102#[derive(Debug)]
103pub(super) struct FragmentEdgeBuildResult {
104    upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
105    pub(super) dispatchers: FragmentActorDispatchers,
106    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
107    actor_new_no_shuffle: ActorNewNoShuffle,
108}
109
110impl FragmentEdgeBuildResult {
111    pub(super) fn actor_new_no_shuffle(&self) -> &ActorNewNoShuffle {
112        &self.actor_new_no_shuffle
113    }
114
115    pub(super) fn collect_actors_to_create(
116        &mut self,
117        actors: impl Iterator<
118            Item = (
119                FragmentId,
120                &StreamNode,
121                impl Iterator<Item = (&StreamActor, WorkerId)>,
122                impl IntoIterator<Item = SubscriberId>,
123            ),
124        >,
125    ) -> StreamJobActorsToCreate {
126        let mut actors_to_create = StreamJobActorsToCreate::default();
127        for (fragment_id, node, actors, subscriber_ids) in actors {
128            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
129            for (actor, worker_id) in actors {
130                let upstreams = self
131                    .upstreams
132                    .get_mut(&fragment_id)
133                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
134                    .unwrap_or_default();
135                let dispatchers = self
136                    .dispatchers
137                    .get_mut(&fragment_id)
138                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
139                    .unwrap_or_default();
140                actors_to_create
141                    .entry(worker_id)
142                    .or_default()
143                    .entry(fragment_id)
144                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
145                    .1
146                    .push((actor.clone(), upstreams, dispatchers))
147            }
148        }
149        actors_to_create
150    }
151
152    pub(super) fn is_empty(&self) -> bool {
153        self.merge_updates
154            .values()
155            .all(|updates| updates.is_empty())
156            && self.dispatchers.values().all(|dispatchers| {
157                dispatchers
158                    .values()
159                    .all(|dispatchers| dispatchers.is_empty())
160            })
161            && self
162                .merge_updates
163                .values()
164                .all(|updates| updates.is_empty())
165    }
166}
167
168pub(super) struct FragmentEdgeBuilder {
169    fragments: HashMap<FragmentId, EdgeBuilderFragmentInfo>,
170    result: FragmentEdgeBuildResult,
171}
172
173impl FragmentEdgeBuilder {
174    /// Create a new edge builder from fragment edge information.
175    pub(super) fn new(
176        fragment_infos: impl IntoIterator<Item = (FragmentId, EdgeBuilderFragmentInfo)>,
177    ) -> Self {
178        let mut fragments = HashMap::new();
179        for (fragment_id, info) in fragment_infos {
180            fragments
181                .try_insert(fragment_id, info)
182                .expect("non-duplicate");
183        }
184        Self {
185            fragments,
186            result: FragmentEdgeBuildResult {
187                upstreams: Default::default(),
188                dispatchers: Default::default(),
189                merge_updates: Default::default(),
190                actor_new_no_shuffle: Default::default(),
191            },
192        }
193    }
194
195    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
196        for (fragment_id, relations) in relations {
197            for relation in relations {
198                self.add_edge(*fragment_id, relation);
199            }
200        }
201    }
202
203    pub(super) fn add_edge(
204        &mut self,
205        fragment_id: FragmentId,
206        downstream: &DownstreamFragmentRelation,
207    ) {
208        let Some(fragment) = &self.fragments.get(&fragment_id) else {
209            if self
210                .fragments
211                .contains_key(&downstream.downstream_fragment_id)
212            {
213                panic!(
214                    "cannot find fragment {} with downstream {:?}",
215                    fragment_id, downstream
216                )
217            } else {
218                // ignore fragment relation with both upstream and downstream not in the set of fragments
219                return;
220            }
221        };
222        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
223        let (dispatchers, no_shuffle_map) = compose_dispatchers(
224            fragment.distribution_type,
225            &fragment.actors,
226            downstream.downstream_fragment_id,
227            downstream_fragment.distribution_type,
228            &downstream_fragment.actors,
229            downstream.dispatcher_type,
230            downstream.dist_key_indices.clone(),
231            downstream.output_mapping.clone(),
232        );
233        if let Some(no_shuffle_map) = no_shuffle_map {
234            self.result
235                .actor_new_no_shuffle
236                .entry(fragment_id)
237                .or_default()
238                .insert(downstream.downstream_fragment_id, no_shuffle_map);
239        }
240        let downstream_fragment_upstreams = self
241            .result
242            .upstreams
243            .entry(downstream.downstream_fragment_id)
244            .or_default();
245        for (actor_id, dispatcher) in dispatchers {
246            let actor_location = &fragment.actor_location[&actor_id];
247            for downstream_actor in &dispatcher.downstream_actor_id {
248                downstream_fragment_upstreams
249                    .entry(*downstream_actor)
250                    .or_default()
251                    .entry(fragment_id)
252                    .or_default()
253                    .insert(
254                        actor_id,
255                        ActorInfo {
256                            actor_id,
257                            host: Some(actor_location.clone()),
258                            partial_graph_id: fragment.partial_graph_id,
259                        },
260                    );
261            }
262            self.result
263                .dispatchers
264                .entry(fragment_id)
265                .or_default()
266                .entry(actor_id)
267                .or_default()
268                .push(dispatcher);
269        }
270    }
271
272    pub(super) fn replace_upstream(
273        &mut self,
274        fragment_id: FragmentId,
275        original_upstream_fragment_id: FragmentId,
276        new_upstream_fragment_id: FragmentId,
277    ) {
278        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
279        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
280            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
281                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
282                    fragment_merge_updates.push(MergeUpdate {
283                        actor_id,
284                        upstream_fragment_id: original_upstream_fragment_id,
285                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
286                        added_upstream_actors: new_upstreams.into_values().collect(),
287                        removed_upstream_actor_id: vec![],
288                    })
289                } else if cfg!(debug_assertions) {
290                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
291                } else {
292                    warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
293                }
294                !actor_upstreams.is_empty()
295            })
296        } else if cfg!(debug_assertions) {
297            panic!(
298                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
299                fragment_id,
300                new_upstream_fragment_id,
301                original_upstream_fragment_id,
302                self.result.upstreams
303            );
304        } else {
305            warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
306        }
307    }
308
309    /// Finalize the builder, returning the edge build result.
310    pub(super) fn build(self) -> FragmentEdgeBuildResult {
311        self.result
312    }
313}