risingwave_meta/barrier/
edge_builder.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::SubscriberId;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30    ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31    FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36    distribution_type: DistributionType,
37    actors: HashMap<ActorId, Option<Bitmap>>,
38    actor_location: HashMap<ActorId, HostAddress>,
39}
40
41pub(super) struct FragmentEdgeBuildResult {
42    pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
43    pub(super) dispatchers: FragmentActorDispatchers,
44    pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
45}
46
47impl FragmentEdgeBuildResult {
48    pub(super) fn collect_actors_to_create(
49        &mut self,
50        actors: impl Iterator<
51            Item = (
52                FragmentId,
53                &StreamNode,
54                impl Iterator<Item = (&StreamActor, WorkerId)>,
55                impl IntoIterator<Item = SubscriberId>,
56            ),
57        >,
58    ) -> StreamJobActorsToCreate {
59        let mut actors_to_create = StreamJobActorsToCreate::default();
60        for (fragment_id, node, actors, subscriber_ids) in actors {
61            let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
62            for (actor, worker_id) in actors {
63                let upstreams = self
64                    .upstreams
65                    .get_mut(&fragment_id)
66                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
67                    .unwrap_or_default();
68                let dispatchers = self
69                    .dispatchers
70                    .get_mut(&fragment_id)
71                    .and_then(|upstreams| upstreams.remove(&actor.actor_id))
72                    .unwrap_or_default();
73                actors_to_create
74                    .entry(worker_id)
75                    .or_default()
76                    .entry(fragment_id)
77                    .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
78                    .1
79                    .push((actor.clone(), upstreams, dispatchers))
80            }
81        }
82        actors_to_create
83    }
84}
85
86pub(super) struct FragmentEdgeBuilder {
87    fragments: HashMap<FragmentId, FragmentInfo>,
88    result: FragmentEdgeBuildResult,
89}
90
91impl FragmentEdgeBuilder {
92    pub(super) fn new(
93        fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
94        control_stream_manager: &ControlStreamManager,
95    ) -> Self {
96        let mut fragments = HashMap::new();
97        for info in fragment_infos {
98            fragments
99                .try_insert(info.fragment_id, {
100                    let (actors, actor_location) = info
101                        .actors
102                        .iter()
103                        .map(|(actor_id, actor)| {
104                            (
105                                (*actor_id, actor.vnode_bitmap.clone()),
106                                (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
107                            )
108                        })
109                        .unzip();
110                    FragmentInfo {
111                        distribution_type: info.distribution_type,
112                        actors,
113                        actor_location,
114                    }
115                })
116                .expect("non-duplicate");
117        }
118        Self {
119            fragments,
120            result: FragmentEdgeBuildResult {
121                upstreams: Default::default(),
122                dispatchers: Default::default(),
123                merge_updates: Default::default(),
124            },
125        }
126    }
127
128    pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
129        for (fragment_id, relations) in relations {
130            for relation in relations {
131                self.add_edge(*fragment_id, relation);
132            }
133        }
134    }
135
136    pub(super) fn add_edge(
137        &mut self,
138        fragment_id: FragmentId,
139        downstream: &DownstreamFragmentRelation,
140    ) {
141        let fragment = &self
142            .fragments
143            .get(&fragment_id)
144            .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
145        let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
146        let dispatchers = compose_dispatchers(
147            fragment.distribution_type,
148            &fragment.actors,
149            downstream.downstream_fragment_id,
150            downstream_fragment.distribution_type,
151            &downstream_fragment.actors,
152            downstream.dispatcher_type,
153            downstream.dist_key_indices.clone(),
154            downstream.output_mapping.clone(),
155        );
156        let downstream_fragment_upstreams = self
157            .result
158            .upstreams
159            .entry(downstream.downstream_fragment_id)
160            .or_default();
161        for (actor_id, dispatcher) in dispatchers {
162            let actor_location = &fragment.actor_location[&actor_id];
163            for downstream_actor in &dispatcher.downstream_actor_id {
164                downstream_fragment_upstreams
165                    .entry(*downstream_actor)
166                    .or_default()
167                    .entry(fragment_id)
168                    .or_default()
169                    .insert(
170                        actor_id,
171                        ActorInfo {
172                            actor_id,
173                            host: Some(actor_location.clone()),
174                        },
175                    );
176            }
177            self.result
178                .dispatchers
179                .entry(fragment_id)
180                .or_default()
181                .entry(actor_id)
182                .or_default()
183                .push(dispatcher);
184        }
185    }
186
187    pub(super) fn replace_upstream(
188        &mut self,
189        fragment_id: FragmentId,
190        original_upstream_fragment_id: FragmentId,
191        new_upstream_fragment_id: FragmentId,
192    ) {
193        let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
194        if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
195            fragment_upstreams.retain(|&actor_id, actor_upstreams| {
196                if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
197                    fragment_merge_updates.push(MergeUpdate {
198                        actor_id,
199                        upstream_fragment_id: original_upstream_fragment_id,
200                        new_upstream_fragment_id: Some(new_upstream_fragment_id),
201                        added_upstream_actors: new_upstreams.into_values().collect(),
202                        removed_upstream_actor_id: vec![],
203                    })
204                } else if cfg!(debug_assertions) {
205                    panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
206                } else {
207                    warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
208                }
209                !actor_upstreams.is_empty()
210            })
211        } else if cfg!(debug_assertions) {
212            panic!(
213                "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
214                fragment_id,
215                new_upstream_fragment_id,
216                original_upstream_fragment_id,
217                self.result.upstreams
218            );
219        } else {
220            warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
221        }
222    }
223
224    pub(super) fn build(self) -> FragmentEdgeBuildResult {
225        self.result
226    }
227}