1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30 ActorId, ActorNewNoShuffle, ActorUpstreams, DownstreamFragmentRelation, Fragment,
31 FragmentActorDispatchers, FragmentDownstreamRelation, FragmentId, StreamActor,
32 StreamJobActorsToCreate,
33};
34
35#[derive(Debug)]
39pub(super) struct EdgeBuilderFragmentInfo {
40 distribution_type: DistributionType,
41 actors: HashMap<ActorId, Option<Bitmap>>,
42 actor_location: HashMap<ActorId, HostAddress>,
43 partial_graph_id: PartialGraphId,
44}
45
46impl EdgeBuilderFragmentInfo {
47 pub fn from_inflight(
49 info: &InflightFragmentInfo,
50 partial_graph_id: PartialGraphId,
51 control_stream_manager: &ControlStreamManager,
52 ) -> Self {
53 let (actors, actor_location) = info
54 .actors
55 .iter()
56 .map(|(&actor_id, actor)| {
57 (
58 (actor_id, actor.vnode_bitmap.clone()),
59 (actor_id, control_stream_manager.host_addr(actor.worker_id)),
60 )
61 })
62 .unzip();
63 Self {
64 distribution_type: info.distribution_type,
65 actors,
66 actor_location,
67 partial_graph_id,
68 }
69 }
70
71 pub fn from_fragment(
73 fragment: &Fragment,
74 stream_actors: &HashMap<FragmentId, Vec<StreamActor>>,
75 actor_worker: &HashMap<ActorId, WorkerId>,
76 partial_graph_id: PartialGraphId,
77 control_stream_manager: &ControlStreamManager,
78 ) -> Self {
79 let (actors, actor_location) = stream_actors
80 .get(&fragment.fragment_id)
81 .into_iter()
82 .flatten()
83 .map(|actor| {
84 (
85 (actor.actor_id, actor.vnode_bitmap.clone()),
86 (
87 actor.actor_id,
88 control_stream_manager.host_addr(actor_worker[&actor.actor_id]),
89 ),
90 )
91 })
92 .unzip();
93 Self {
94 distribution_type: fragment.distribution_type.into(),
95 actors,
96 actor_location,
97 partial_graph_id,
98 }
99 }
100}
101
102#[derive(Debug)]
103pub(super) struct FragmentEdgeBuildResult {
104 upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
105 pub(super) dispatchers: FragmentActorDispatchers,
106 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
107 actor_new_no_shuffle: ActorNewNoShuffle,
108}
109
110impl FragmentEdgeBuildResult {
111 pub(super) fn actor_new_no_shuffle(&self) -> &ActorNewNoShuffle {
112 &self.actor_new_no_shuffle
113 }
114
115 pub(super) fn collect_actors_to_create(
116 &mut self,
117 actors: impl Iterator<
118 Item = (
119 FragmentId,
120 &StreamNode,
121 impl Iterator<Item = (&StreamActor, WorkerId)>,
122 impl IntoIterator<Item = SubscriberId>,
123 ),
124 >,
125 ) -> StreamJobActorsToCreate {
126 let mut actors_to_create = StreamJobActorsToCreate::default();
127 for (fragment_id, node, actors, subscriber_ids) in actors {
128 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
129 for (actor, worker_id) in actors {
130 let upstreams = self
131 .upstreams
132 .get_mut(&fragment_id)
133 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
134 .unwrap_or_default();
135 let dispatchers = self
136 .dispatchers
137 .get_mut(&fragment_id)
138 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
139 .unwrap_or_default();
140 actors_to_create
141 .entry(worker_id)
142 .or_default()
143 .entry(fragment_id)
144 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
145 .1
146 .push((actor.clone(), upstreams, dispatchers))
147 }
148 }
149 actors_to_create
150 }
151
152 pub(super) fn is_empty(&self) -> bool {
153 self.merge_updates
154 .values()
155 .all(|updates| updates.is_empty())
156 && self.dispatchers.values().all(|dispatchers| {
157 dispatchers
158 .values()
159 .all(|dispatchers| dispatchers.is_empty())
160 })
161 && self
162 .merge_updates
163 .values()
164 .all(|updates| updates.is_empty())
165 }
166}
167
168pub(super) struct FragmentEdgeBuilder {
169 fragments: HashMap<FragmentId, EdgeBuilderFragmentInfo>,
170 result: FragmentEdgeBuildResult,
171}
172
173impl FragmentEdgeBuilder {
174 pub(super) fn new(
176 fragment_infos: impl IntoIterator<Item = (FragmentId, EdgeBuilderFragmentInfo)>,
177 ) -> Self {
178 let mut fragments = HashMap::new();
179 for (fragment_id, info) in fragment_infos {
180 fragments
181 .try_insert(fragment_id, info)
182 .expect("non-duplicate");
183 }
184 Self {
185 fragments,
186 result: FragmentEdgeBuildResult {
187 upstreams: Default::default(),
188 dispatchers: Default::default(),
189 merge_updates: Default::default(),
190 actor_new_no_shuffle: Default::default(),
191 },
192 }
193 }
194
195 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
196 for (fragment_id, relations) in relations {
197 for relation in relations {
198 self.add_edge(*fragment_id, relation);
199 }
200 }
201 }
202
203 pub(super) fn add_edge(
204 &mut self,
205 fragment_id: FragmentId,
206 downstream: &DownstreamFragmentRelation,
207 ) {
208 let Some(fragment) = &self.fragments.get(&fragment_id) else {
209 if self
210 .fragments
211 .contains_key(&downstream.downstream_fragment_id)
212 {
213 panic!(
214 "cannot find fragment {} with downstream {:?}",
215 fragment_id, downstream
216 )
217 } else {
218 return;
220 }
221 };
222 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
223 let (dispatchers, no_shuffle_map) = compose_dispatchers(
224 fragment.distribution_type,
225 &fragment.actors,
226 downstream.downstream_fragment_id,
227 downstream_fragment.distribution_type,
228 &downstream_fragment.actors,
229 downstream.dispatcher_type,
230 downstream.dist_key_indices.clone(),
231 downstream.output_mapping.clone(),
232 );
233 if let Some(no_shuffle_map) = no_shuffle_map {
234 self.result
235 .actor_new_no_shuffle
236 .entry(fragment_id)
237 .or_default()
238 .insert(downstream.downstream_fragment_id, no_shuffle_map);
239 }
240 let downstream_fragment_upstreams = self
241 .result
242 .upstreams
243 .entry(downstream.downstream_fragment_id)
244 .or_default();
245 for (actor_id, dispatcher) in dispatchers {
246 let actor_location = &fragment.actor_location[&actor_id];
247 for downstream_actor in &dispatcher.downstream_actor_id {
248 downstream_fragment_upstreams
249 .entry(*downstream_actor)
250 .or_default()
251 .entry(fragment_id)
252 .or_default()
253 .insert(
254 actor_id,
255 ActorInfo {
256 actor_id,
257 host: Some(actor_location.clone()),
258 partial_graph_id: fragment.partial_graph_id,
259 },
260 );
261 }
262 self.result
263 .dispatchers
264 .entry(fragment_id)
265 .or_default()
266 .entry(actor_id)
267 .or_default()
268 .push(dispatcher);
269 }
270 }
271
272 pub(super) fn replace_upstream(
273 &mut self,
274 fragment_id: FragmentId,
275 original_upstream_fragment_id: FragmentId,
276 new_upstream_fragment_id: FragmentId,
277 ) {
278 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
279 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
280 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
281 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
282 fragment_merge_updates.push(MergeUpdate {
283 actor_id,
284 upstream_fragment_id: original_upstream_fragment_id,
285 new_upstream_fragment_id: Some(new_upstream_fragment_id),
286 added_upstream_actors: new_upstreams.into_values().collect(),
287 removed_upstream_actor_id: vec![],
288 })
289 } else if cfg!(debug_assertions) {
290 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
291 } else {
292 warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
293 }
294 !actor_upstreams.is_empty()
295 })
296 } else if cfg!(debug_assertions) {
297 panic!(
298 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
299 fragment_id,
300 new_upstream_fragment_id,
301 original_upstream_fragment_id,
302 self.result.upstreams
303 );
304 } else {
305 warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
306 }
307 }
308
309 pub(super) fn build(self) -> FragmentEdgeBuildResult {
311 self.result
312 }
313}