1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress, WorkerNode};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30 ActorId, ActorNewNoShuffle, ActorUpstreams, DownstreamFragmentRelation, Fragment,
31 FragmentActorDispatchers, FragmentDownstreamRelation, FragmentId, StreamActor,
32 StreamJobActorsToCreate,
33};
34
35#[derive(Debug)]
39pub(super) struct EdgeBuilderFragmentInfo {
40 distribution_type: DistributionType,
41 actors: HashMap<ActorId, Option<Bitmap>>,
42 actor_location: HashMap<ActorId, HostAddress>,
43 partial_graph_id: PartialGraphId,
44}
45
46impl EdgeBuilderFragmentInfo {
47 pub fn from_inflight(
49 info: &InflightFragmentInfo,
50 partial_graph_id: PartialGraphId,
51 control_stream_manager: &ControlStreamManager,
52 ) -> Self {
53 let (actors, actor_location) = info
54 .actors
55 .iter()
56 .map(|(&actor_id, actor)| {
57 (
58 (actor_id, actor.vnode_bitmap.clone()),
59 (actor_id, control_stream_manager.host_addr(actor.worker_id)),
60 )
61 })
62 .unzip();
63 Self {
64 distribution_type: info.distribution_type,
65 actors,
66 actor_location,
67 partial_graph_id,
68 }
69 }
70
71 pub fn from_inflight_with_worker_nodes(
77 info: &InflightFragmentInfo,
78 partial_graph_id: PartialGraphId,
79 worker_nodes: &HashMap<WorkerId, WorkerNode>,
80 ) -> Self {
81 let (actors, actor_location) = info
82 .actors
83 .iter()
84 .map(|(&actor_id, actor)| {
85 (
86 (actor_id, actor.vnode_bitmap.clone()),
87 (
88 actor_id,
89 worker_nodes[&actor.worker_id].host.clone().unwrap(),
90 ),
91 )
92 })
93 .unzip();
94 Self {
95 distribution_type: info.distribution_type,
96 actors,
97 actor_location,
98 partial_graph_id,
99 }
100 }
101
102 pub fn from_fragment(
104 fragment: &Fragment,
105 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
106 actor_worker: &HashMap<ActorId, WorkerId>,
107 partial_graph_id: PartialGraphId,
108 control_stream_manager: &ControlStreamManager,
109 ) -> Self {
110 let (actors, actor_location) = stream_actors
111 .get(&fragment.fragment_id)
112 .into_iter()
113 .flatten()
114 .map(|actor| {
115 (
116 (actor.actor_id, actor.vnode_bitmap.clone()),
117 (
118 actor.actor_id,
119 control_stream_manager.host_addr(actor_worker[&actor.actor_id]),
120 ),
121 )
122 })
123 .unzip();
124 Self {
125 distribution_type: fragment.distribution_type.into(),
126 actors,
127 actor_location,
128 partial_graph_id,
129 }
130 }
131}
132
133#[derive(Debug)]
134pub(super) struct FragmentEdgeBuildResult {
135 upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
136 pub(super) dispatchers: FragmentActorDispatchers,
137 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
138 actor_new_no_shuffle: ActorNewNoShuffle,
139}
140
141impl FragmentEdgeBuildResult {
142 pub(super) fn actor_new_no_shuffle(&self) -> &ActorNewNoShuffle {
143 &self.actor_new_no_shuffle
144 }
145
146 pub(super) fn collect_actors_to_create(
147 &mut self,
148 actors: impl Iterator<
149 Item = (
150 FragmentId,
151 &StreamNode,
152 impl Iterator<Item = (&StreamActor, WorkerId)>,
153 impl IntoIterator<Item = SubscriberId>,
154 ),
155 >,
156 ) -> StreamJobActorsToCreate {
157 let mut actors_to_create = StreamJobActorsToCreate::default();
158 for (fragment_id, node, actors, subscriber_ids) in actors {
159 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
160 for (actor, worker_id) in actors {
161 let upstreams = self
162 .upstreams
163 .get_mut(&fragment_id)
164 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
165 .unwrap_or_default();
166 let dispatchers = self
167 .dispatchers
168 .get_mut(&fragment_id)
169 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
170 .unwrap_or_default();
171 actors_to_create
172 .entry(worker_id)
173 .or_default()
174 .entry(fragment_id)
175 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
176 .1
177 .push((actor.clone(), upstreams, dispatchers))
178 }
179 }
180 actors_to_create
181 }
182
183 pub(super) fn is_empty(&self) -> bool {
184 self.merge_updates
185 .values()
186 .all(|updates| updates.is_empty())
187 && self.dispatchers.values().all(|dispatchers| {
188 dispatchers
189 .values()
190 .all(|dispatchers| dispatchers.is_empty())
191 })
192 && self
193 .merge_updates
194 .values()
195 .all(|updates| updates.is_empty())
196 }
197}
198
199pub(super) struct FragmentEdgeBuilder {
200 fragments: HashMap<FragmentId, EdgeBuilderFragmentInfo>,
201 result: FragmentEdgeBuildResult,
202}
203
204impl FragmentEdgeBuilder {
205 pub(super) fn new(
207 fragment_infos: impl IntoIterator<Item = (FragmentId, EdgeBuilderFragmentInfo)>,
208 ) -> Self {
209 let mut fragments = HashMap::new();
210 for (fragment_id, info) in fragment_infos {
211 fragments
212 .try_insert(fragment_id, info)
213 .expect("non-duplicate");
214 }
215 Self {
216 fragments,
217 result: FragmentEdgeBuildResult {
218 upstreams: Default::default(),
219 dispatchers: Default::default(),
220 merge_updates: Default::default(),
221 actor_new_no_shuffle: Default::default(),
222 },
223 }
224 }
225
226 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
227 for (fragment_id, relations) in relations {
228 for relation in relations {
229 self.add_edge(*fragment_id, relation);
230 }
231 }
232 }
233
234 pub(super) fn add_edge(
235 &mut self,
236 fragment_id: FragmentId,
237 downstream: &DownstreamFragmentRelation,
238 ) {
239 let Some(fragment) = &self.fragments.get(&fragment_id) else {
240 if self
241 .fragments
242 .contains_key(&downstream.downstream_fragment_id)
243 {
244 panic!(
245 "cannot find fragment {} with downstream {:?}",
246 fragment_id, downstream
247 )
248 } else {
249 return;
251 }
252 };
253 let Some(downstream_fragment) = &self.fragments.get(&downstream.downstream_fragment_id)
254 else {
255 return;
258 };
259 let (dispatchers, no_shuffle_map) = compose_dispatchers(
260 fragment.distribution_type,
261 &fragment.actors,
262 downstream.downstream_fragment_id,
263 downstream_fragment.distribution_type,
264 &downstream_fragment.actors,
265 downstream.dispatcher_type,
266 downstream.dist_key_indices.clone(),
267 downstream.output_mapping.clone(),
268 );
269 if let Some(no_shuffle_map) = no_shuffle_map {
270 self.result
271 .actor_new_no_shuffle
272 .entry(fragment_id)
273 .or_default()
274 .insert(downstream.downstream_fragment_id, no_shuffle_map);
275 }
276 let downstream_fragment_upstreams = self
277 .result
278 .upstreams
279 .entry(downstream.downstream_fragment_id)
280 .or_default();
281 for (actor_id, dispatcher) in dispatchers {
282 let actor_location = &fragment.actor_location[&actor_id];
283 for downstream_actor in &dispatcher.downstream_actor_id {
284 downstream_fragment_upstreams
285 .entry(*downstream_actor)
286 .or_default()
287 .entry(fragment_id)
288 .or_default()
289 .insert(
290 actor_id,
291 ActorInfo {
292 actor_id,
293 host: Some(actor_location.clone()),
294 partial_graph_id: fragment.partial_graph_id,
295 },
296 );
297 }
298 self.result
299 .dispatchers
300 .entry(fragment_id)
301 .or_default()
302 .entry(actor_id)
303 .or_default()
304 .push(dispatcher);
305 }
306 }
307
308 pub(super) fn replace_upstream(
309 &mut self,
310 fragment_id: FragmentId,
311 original_upstream_fragment_id: FragmentId,
312 new_upstream_fragment_id: FragmentId,
313 ) {
314 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
315 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
316 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
317 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
318 fragment_merge_updates.push(MergeUpdate {
319 actor_id,
320 upstream_fragment_id: original_upstream_fragment_id,
321 new_upstream_fragment_id: Some(new_upstream_fragment_id),
322 added_upstream_actors: new_upstreams.into_values().collect(),
323 removed_upstream_actor_id: vec![],
324 })
325 } else if cfg!(debug_assertions) {
326 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
327 } else {
328 warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
329 }
330 !actor_upstreams.is_empty()
331 })
332 } else if cfg!(debug_assertions) {
333 panic!(
334 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
335 fragment_id,
336 new_upstream_fragment_id,
337 original_upstream_fragment_id,
338 self.result.upstreams
339 );
340 } else {
341 warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
342 }
343 }
344
345 pub(super) fn build(self) -> FragmentEdgeBuildResult {
347 self.result
348 }
349}