risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress, WorkerNode};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30    ActorId, ActorNewNoShuffle, ActorUpstreams, DownstreamFragmentRelation, Fragment,
31    FragmentActorDispatchers, FragmentDownstreamRelation, FragmentId, StreamActor,
32    StreamJobActorsToCreate,
33};
34
35/// Fragment information needed by [`FragmentEdgeBuilder`] to compute dispatchers and merge nodes.
36///
37/// Contains actor bitmaps and resolved host addresses.
38#[derive(Debug)]
39pub(super) struct EdgeBuilderFragmentInfo {
40    distribution_type: DistributionType,
41    actors: HashMap<ActorId, Option<Bitmap>>,
42    actor_location: HashMap<ActorId, HostAddress>,
43    partial_graph_id: PartialGraphId,
44}
45
46impl EdgeBuilderFragmentInfo {
47    /// Build from an already-inflight fragment (actors already materialized).
48    pub fn from_inflight(
49        info: &InflightFragmentInfo,
50        partial_graph_id: PartialGraphId,
51        control_stream_manager: &ControlStreamManager,
52    ) -> Self {
53        let (actors, actor_location) = info
54            .actors
55            .iter()
56            .map(|(&actor_id, actor)| {
57                (
58                    (actor_id, actor.vnode_bitmap.clone()),
59                    (actor_id, control_stream_manager.host_addr(actor.worker_id)),
60                )
61            })
62            .unzip();
63        Self {
64            distribution_type: info.distribution_type,
65            actors,
66            actor_location,
67            partial_graph_id,
68        }
69    }
70
71    /// Build from an already-inflight fragment using worker node map for host resolution.
72    ///
73    /// Unlike [`from_inflight`](Self::from_inflight), this does not require a
74    /// `ControlStreamManager` and can be used when only worker node metadata is available
75    /// (e.g., during `render_runtime_info` before the control streams are fully set up).
76    pub fn from_inflight_with_worker_nodes(
77        info: &InflightFragmentInfo,
78        partial_graph_id: PartialGraphId,
79        worker_nodes: &HashMap<WorkerId, WorkerNode>,
80    ) -> Self {
81        let (actors, actor_location) = info
82            .actors
83            .iter()
84            .map(|(&actor_id, actor)| {
85                (
86                    (actor_id, actor.vnode_bitmap.clone()),
87                    (
88                        actor_id,
89                        worker_nodes[&actor.worker_id].host.clone().unwrap(),
90                    ),
91                )
92            })
93            .unzip();
94        Self {
95            distribution_type: info.distribution_type,
96            actors,
97            actor_location,
98            partial_graph_id,
99        }
100    }
101
102    /// Build from a model `Fragment` with separately provided actors and locations.
103    pub fn from_fragment(
104        fragment: &Fragment,
105        stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
106        actor_worker: &HashMap<ActorId, WorkerId>,
107        partial_graph_id: PartialGraphId,
108        control_stream_manager: &ControlStreamManager,
109    ) -> Self {
110        let (actors, actor_location) = stream_actors
111            .get(&fragment.fragment_id)
112            .into_iter()
113            .flatten()
114            .map(|actor| {
115                (
116                    (actor.actor_id, actor.vnode_bitmap.clone()),
117                    (
118                        actor.actor_id,
119                        control_stream_manager.host_addr(actor_worker[&actor.actor_id]),
120                    ),
121                )
122            })
123            .unzip();
124        Self {
125            distribution_type: fragment.distribution_type.into(),
126            actors,
127            actor_location,
128            partial_graph_id,
129        }
130    }
131}
132
133#[derive(Debug)]
134pub(super) struct FragmentEdgeBuildResult {
135    upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
136    pub(super) dispatchers: FragmentActorDispatchers,
137    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
138    actor_new_no_shuffle: ActorNewNoShuffle,
139}
140
141impl FragmentEdgeBuildResult {
142    pub(super) fn actor_new_no_shuffle(&self) -> &ActorNewNoShuffle {
143        &self.actor_new_no_shuffle
144    }
145
146    pub(super) fn collect_actors_to_create(
147        &mut self,
148        actors: impl Iterator<
149            Item = (
150                FragmentId,
151                &StreamNode,
152                impl Iterator<Item = (&StreamActor, WorkerId)>,
153                impl IntoIterator<Item = SubscriberId>,
154            ),
155        >,
156    ) -> StreamJobActorsToCreate {
157        let mut actors_to_create = StreamJobActorsToCreate::default();
158        for (fragment_id, node, actors, subscriber_ids) in actors {
159            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
160            for (actor, worker_id) in actors {
161                let upstreams = self
162                    .upstreams
163                    .get_mut(&fragment_id)
164                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
165                    .unwrap_or_default();
166                let dispatchers = self
167                    .dispatchers
168                    .get_mut(&fragment_id)
169                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
170                    .unwrap_or_default();
171                actors_to_create
172                    .entry(worker_id)
173                    .or_default()
174                    .entry(fragment_id)
175                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
176                    .1
177                    .push((actor.clone(), upstreams, dispatchers))
178            }
179        }
180        actors_to_create
181    }
182
183    pub(super) fn is_empty(&self) -> bool {
184        self.merge_updates
185            .values()
186            .all(|updates| updates.is_empty())
187            && self.dispatchers.values().all(|dispatchers| {
188                dispatchers
189                    .values()
190                    .all(|dispatchers| dispatchers.is_empty())
191            })
192            && self
193                .merge_updates
194                .values()
195                .all(|updates| updates.is_empty())
196    }
197}
198
199pub(super) struct FragmentEdgeBuilder {
200    fragments: HashMap<FragmentId, EdgeBuilderFragmentInfo>,
201    result: FragmentEdgeBuildResult,
202}
203
204impl FragmentEdgeBuilder {
205    /// Create a new edge builder from fragment edge information.
206    pub(super) fn new(
207        fragment_infos: impl IntoIterator<Item = (FragmentId, EdgeBuilderFragmentInfo)>,
208    ) -> Self {
209        let mut fragments = HashMap::new();
210        for (fragment_id, info) in fragment_infos {
211            fragments
212                .try_insert(fragment_id, info)
213                .expect("non-duplicate");
214        }
215        Self {
216            fragments,
217            result: FragmentEdgeBuildResult {
218                upstreams: Default::default(),
219                dispatchers: Default::default(),
220                merge_updates: Default::default(),
221                actor_new_no_shuffle: Default::default(),
222            },
223        }
224    }
225
226    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
227        for (fragment_id, relations) in relations {
228            for relation in relations {
229                self.add_edge(*fragment_id, relation);
230            }
231        }
232    }
233
234    pub(super) fn add_edge(
235        &mut self,
236        fragment_id: FragmentId,
237        downstream: &DownstreamFragmentRelation,
238    ) {
239        let Some(fragment) = &self.fragments.get(&fragment_id) else {
240            if self
241                .fragments
242                .contains_key(&downstream.downstream_fragment_id)
243            {
244                panic!(
245                    "cannot find fragment {} with downstream {:?}",
246                    fragment_id, downstream
247                )
248            } else {
249                // ignore fragment relation with both upstream and downstream not in the set of fragments
250                return;
251            }
252        };
253        let Some(downstream_fragment) = &self.fragments.get(&downstream.downstream_fragment_id)
254        else {
255            // upstream is in the builder but downstream is not (e.g., edge to an independent job's fragment).
256            // Skip this edge.
257            return;
258        };
259        let (dispatchers, no_shuffle_map) = compose_dispatchers(
260            fragment.distribution_type,
261            &fragment.actors,
262            downstream.downstream_fragment_id,
263            downstream_fragment.distribution_type,
264            &downstream_fragment.actors,
265            downstream.dispatcher_type,
266            downstream.dist_key_indices.clone(),
267            downstream.output_mapping.clone(),
268        );
269        if let Some(no_shuffle_map) = no_shuffle_map {
270            self.result
271                .actor_new_no_shuffle
272                .entry(fragment_id)
273                .or_default()
274                .insert(downstream.downstream_fragment_id, no_shuffle_map);
275        }
276        let downstream_fragment_upstreams = self
277            .result
278            .upstreams
279            .entry(downstream.downstream_fragment_id)
280            .or_default();
281        for (actor_id, dispatcher) in dispatchers {
282            let actor_location = &fragment.actor_location[&actor_id];
283            for downstream_actor in &dispatcher.downstream_actor_id {
284                downstream_fragment_upstreams
285                    .entry(*downstream_actor)
286                    .or_default()
287                    .entry(fragment_id)
288                    .or_default()
289                    .insert(
290                        actor_id,
291                        ActorInfo {
292                            actor_id,
293                            host: Some(actor_location.clone()),
294                            partial_graph_id: fragment.partial_graph_id,
295                        },
296                    );
297            }
298            self.result
299                .dispatchers
300                .entry(fragment_id)
301                .or_default()
302                .entry(actor_id)
303                .or_default()
304                .push(dispatcher);
305        }
306    }
307
308    pub(super) fn replace_upstream(
309        &mut self,
310        fragment_id: FragmentId,
311        original_upstream_fragment_id: FragmentId,
312        new_upstream_fragment_id: FragmentId,
313    ) {
314        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
315        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
316            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
317                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
318                    fragment_merge_updates.push(MergeUpdate {
319                        actor_id,
320                        upstream_fragment_id: original_upstream_fragment_id,
321                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
322                        added_upstream_actors: new_upstreams.into_values().collect(),
323                        removed_upstream_actor_id: vec![],
324                    })
325                } else if cfg!(debug_assertions) {
326                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
327                } else {
328                    warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
329                }
330                !actor_upstreams.is_empty()
331            })
332        } else if cfg!(debug_assertions) {
333            panic!(
334                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
335                fragment_id,
336                new_upstream_fragment_id,
337                original_upstream_fragment_id,
338                self.result.upstreams
339            );
340        } else {
341            warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
342        }
343    }
344
345    /// Finalize the builder, returning the edge build result.
346    pub(super) fn build(self) -> FragmentEdgeBuildResult {
347        self.result
348    }
349}