risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30    ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36    distribution_type: DistributionType,
37    actors: HashMap<ActorId, Option<Bitmap>>,
38    actor_location: HashMap<ActorId, HostAddress>,
39    partial_graph_id: PartialGraphId,
40}
41
42#[derive(Debug)]
43pub(super) struct FragmentEdgeBuildResult {
44    pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
45    pub(super) dispatchers: FragmentActorDispatchers,
46    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
47}
48
49impl FragmentEdgeBuildResult {
50    pub(super) fn collect_actors_to_create(
51        &mut self,
52        actors: impl Iterator<
53            Item = (
54                FragmentId,
55                &StreamNode,
56                impl Iterator<Item = (&StreamActor, WorkerId)>,
57                impl IntoIterator<Item = SubscriberId>,
58            ),
59        >,
60    ) -> StreamJobActorsToCreate {
61        let mut actors_to_create = StreamJobActorsToCreate::default();
62        for (fragment_id, node, actors, subscriber_ids) in actors {
63            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
64            for (actor, worker_id) in actors {
65                let upstreams = self
66                    .upstreams
67                    .get_mut(&fragment_id)
68                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
69                    .unwrap_or_default();
70                let dispatchers = self
71                    .dispatchers
72                    .get_mut(&fragment_id)
73                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
74                    .unwrap_or_default();
75                actors_to_create
76                    .entry(worker_id)
77                    .or_default()
78                    .entry(fragment_id)
79                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
80                    .1
81                    .push((actor.clone(), upstreams, dispatchers))
82            }
83        }
84        actors_to_create
85    }
86
87    pub(super) fn is_empty(&self) -> bool {
88        self.merge_updates
89            .values()
90            .all(|updates| updates.is_empty())
91            && self.dispatchers.values().all(|dispatchers| {
92                dispatchers
93                    .values()
94                    .all(|dispatchers| dispatchers.is_empty())
95            })
96            && self
97                .merge_updates
98                .values()
99                .all(|updates| updates.is_empty())
100    }
101}
102
103pub(super) struct FragmentEdgeBuilder {
104    fragments: HashMap<FragmentId, FragmentInfo>,
105    result: FragmentEdgeBuildResult,
106}
107
108impl FragmentEdgeBuilder {
109    pub(super) fn new(
110        fragment_infos: impl Iterator<Item = (&InflightFragmentInfo, PartialGraphId)>,
111        control_stream_manager: &ControlStreamManager,
112    ) -> Self {
113        let mut fragments = HashMap::new();
114        for (info, partial_graph_id) in fragment_infos {
115            fragments
116                .try_insert(info.fragment_id, {
117                    let (actors, actor_location) = info
118                        .actors
119                        .iter()
120                        .map(|(actor_id, actor)| {
121                            (
122                                (*actor_id, actor.vnode_bitmap.clone()),
123                                (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
124                            )
125                        })
126                        .unzip();
127                    FragmentInfo {
128                        distribution_type: info.distribution_type,
129                        actors,
130                        actor_location,
131                        partial_graph_id,
132                    }
133                })
134                .expect("non-duplicate");
135        }
136        Self {
137            fragments,
138            result: FragmentEdgeBuildResult {
139                upstreams: Default::default(),
140                dispatchers: Default::default(),
141                merge_updates: Default::default(),
142            },
143        }
144    }
145
146    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
147        for (fragment_id, relations) in relations {
148            for relation in relations {
149                self.add_edge(*fragment_id, relation);
150            }
151        }
152    }
153
154    pub(super) fn add_edge(
155        &mut self,
156        fragment_id: FragmentId,
157        downstream: &DownstreamFragmentRelation,
158    ) {
159        let Some(fragment) = &self.fragments.get(&fragment_id) else {
160            if self
161                .fragments
162                .contains_key(&downstream.downstream_fragment_id)
163            {
164                panic!(
165                    "cannot find fragment {} with downstream {:?}",
166                    fragment_id, downstream
167                )
168            } else {
169                // ignore fragment relation with both upstream and downstream not in the set of fragments
170                return;
171            }
172        };
173        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
174        let dispatchers = compose_dispatchers(
175            fragment.distribution_type,
176            &fragment.actors,
177            downstream.downstream_fragment_id,
178            downstream_fragment.distribution_type,
179            &downstream_fragment.actors,
180            downstream.dispatcher_type,
181            downstream.dist_key_indices.clone(),
182            downstream.output_mapping.clone(),
183        );
184        let downstream_fragment_upstreams = self
185            .result
186            .upstreams
187            .entry(downstream.downstream_fragment_id)
188            .or_default();
189        for (actor_id, dispatcher) in dispatchers {
190            let actor_location = &fragment.actor_location[&actor_id];
191            for downstream_actor in &dispatcher.downstream_actor_id {
192                downstream_fragment_upstreams
193                    .entry(*downstream_actor)
194                    .or_default()
195                    .entry(fragment_id)
196                    .or_default()
197                    .insert(
198                        actor_id,
199                        ActorInfo {
200                            actor_id,
201                            host: Some(actor_location.clone()),
202                            partial_graph_id: fragment.partial_graph_id,
203                        },
204                    );
205            }
206            self.result
207                .dispatchers
208                .entry(fragment_id)
209                .or_default()
210                .entry(actor_id)
211                .or_default()
212                .push(dispatcher);
213        }
214    }
215
216    pub(super) fn replace_upstream(
217        &mut self,
218        fragment_id: FragmentId,
219        original_upstream_fragment_id: FragmentId,
220        new_upstream_fragment_id: FragmentId,
221    ) {
222        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
223        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
224            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
225                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
226                    fragment_merge_updates.push(MergeUpdate {
227                        actor_id,
228                        upstream_fragment_id: original_upstream_fragment_id,
229                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
230                        added_upstream_actors: new_upstreams.into_values().collect(),
231                        removed_upstream_actor_id: vec![],
232                    })
233                } else if cfg!(debug_assertions) {
234                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
235                } else {
236                    warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
237                }
238                !actor_upstreams.is_empty()
239            })
240        } else if cfg!(debug_assertions) {
241            panic!(
242                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
243                fragment_id,
244                new_upstream_fragment_id,
245                original_upstream_fragment_id,
246                self.result.upstreams
247            );
248        } else {
249            warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
250        }
251    }
252
253    pub(super) fn build(self) -> FragmentEdgeBuildResult {
254        self.result
255    }
256}