risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::SubscriberId;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30    ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36    distribution_type: DistributionType,
37    actors: HashMap<ActorId, Option<Bitmap>>,
38    actor_location: HashMap<ActorId, HostAddress>,
39}
40
41#[derive(Debug)]
42pub(super) struct FragmentEdgeBuildResult {
43    pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
44    pub(super) dispatchers: FragmentActorDispatchers,
45    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
46}
47
48impl FragmentEdgeBuildResult {
49    pub(super) fn collect_actors_to_create(
50        &mut self,
51        actors: impl Iterator<
52            Item = (
53                FragmentId,
54                &StreamNode,
55                impl Iterator<Item = (&StreamActor, WorkerId)>,
56                impl IntoIterator<Item = SubscriberId>,
57            ),
58        >,
59    ) -> StreamJobActorsToCreate {
60        let mut actors_to_create = StreamJobActorsToCreate::default();
61        for (fragment_id, node, actors, subscriber_ids) in actors {
62            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
63            for (actor, worker_id) in actors {
64                let upstreams = self
65                    .upstreams
66                    .get_mut(&fragment_id)
67                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
68                    .unwrap_or_default();
69                let dispatchers = self
70                    .dispatchers
71                    .get_mut(&fragment_id)
72                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
73                    .unwrap_or_default();
74                actors_to_create
75                    .entry(worker_id)
76                    .or_default()
77                    .entry(fragment_id)
78                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
79                    .1
80                    .push((actor.clone(), upstreams, dispatchers))
81            }
82        }
83        actors_to_create
84    }
85
86    pub(super) fn is_empty(&self) -> bool {
87        self.merge_updates
88            .values()
89            .all(|updates| updates.is_empty())
90            && self.dispatchers.values().all(|dispatchers| {
91                dispatchers
92                    .values()
93                    .all(|dispatchers| dispatchers.is_empty())
94            })
95            && self
96                .merge_updates
97                .values()
98                .all(|updates| updates.is_empty())
99    }
100}
101
102pub(super) struct FragmentEdgeBuilder {
103    fragments: HashMap<FragmentId, FragmentInfo>,
104    result: FragmentEdgeBuildResult,
105}
106
107impl FragmentEdgeBuilder {
108    pub(super) fn new(
109        fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
110        control_stream_manager: &ControlStreamManager,
111    ) -> Self {
112        let mut fragments = HashMap::new();
113        for info in fragment_infos {
114            fragments
115                .try_insert(info.fragment_id, {
116                    let (actors, actor_location) = info
117                        .actors
118                        .iter()
119                        .map(|(actor_id, actor)| {
120                            (
121                                (*actor_id, actor.vnode_bitmap.clone()),
122                                (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
123                            )
124                        })
125                        .unzip();
126                    FragmentInfo {
127                        distribution_type: info.distribution_type,
128                        actors,
129                        actor_location,
130                    }
131                })
132                .expect("non-duplicate");
133        }
134        Self {
135            fragments,
136            result: FragmentEdgeBuildResult {
137                upstreams: Default::default(),
138                dispatchers: Default::default(),
139                merge_updates: Default::default(),
140            },
141        }
142    }
143
144    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
145        for (fragment_id, relations) in relations {
146            for relation in relations {
147                self.add_edge(*fragment_id, relation);
148            }
149        }
150    }
151
152    pub(super) fn add_edge(
153        &mut self,
154        fragment_id: FragmentId,
155        downstream: &DownstreamFragmentRelation,
156    ) {
157        let fragment = &self
158            .fragments
159            .get(&fragment_id)
160            .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
161        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
162        let dispatchers = compose_dispatchers(
163            fragment.distribution_type,
164            &fragment.actors,
165            downstream.downstream_fragment_id,
166            downstream_fragment.distribution_type,
167            &downstream_fragment.actors,
168            downstream.dispatcher_type,
169            downstream.dist_key_indices.clone(),
170            downstream.output_mapping.clone(),
171        );
172        let downstream_fragment_upstreams = self
173            .result
174            .upstreams
175            .entry(downstream.downstream_fragment_id)
176            .or_default();
177        for (actor_id, dispatcher) in dispatchers {
178            let actor_location = &fragment.actor_location[&actor_id];
179            for downstream_actor in &dispatcher.downstream_actor_id {
180                downstream_fragment_upstreams
181                    .entry(*downstream_actor)
182                    .or_default()
183                    .entry(fragment_id)
184                    .or_default()
185                    .insert(
186                        actor_id,
187                        ActorInfo {
188                            actor_id,
189                            host: Some(actor_location.clone()),
190                        },
191                    );
192            }
193            self.result
194                .dispatchers
195                .entry(fragment_id)
196                .or_default()
197                .entry(actor_id)
198                .or_default()
199                .push(dispatcher);
200        }
201    }
202
203    pub(super) fn replace_upstream(
204        &mut self,
205        fragment_id: FragmentId,
206        original_upstream_fragment_id: FragmentId,
207        new_upstream_fragment_id: FragmentId,
208    ) {
209        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
210        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
211            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
212                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
213                    fragment_merge_updates.push(MergeUpdate {
214                        actor_id,
215                        upstream_fragment_id: original_upstream_fragment_id,
216                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
217                        added_upstream_actors: new_upstreams.into_values().collect(),
218                        removed_upstream_actor_id: vec![],
219                    })
220                } else if cfg!(debug_assertions) {
221                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
222                } else {
223                    warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
224                }
225                !actor_upstreams.is_empty()
226            })
227        } else if cfg!(debug_assertions) {
228            panic!(
229                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
230                fragment_id,
231                new_upstream_fragment_id,
232                original_upstream_fragment_id,
233                self.result.upstreams
234            );
235        } else {
236            warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
237        }
238    }
239
240    pub(super) fn build(self) -> FragmentEdgeBuildResult {
241        self.result
242    }
243}