1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::{PartialGraphId, SubscriberId};
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36 distribution_type: DistributionType,
37 actors: HashMap<ActorId, Option<Bitmap>>,
38 actor_location: HashMap<ActorId, HostAddress>,
39 partial_graph_id: PartialGraphId,
40}
41
42#[derive(Debug)]
43pub(super) struct FragmentEdgeBuildResult {
44 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
45 pub(super) dispatchers: FragmentActorDispatchers,
46 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
47}
48
49impl FragmentEdgeBuildResult {
50 pub(super) fn collect_actors_to_create(
51 &mut self,
52 actors: impl Iterator<
53 Item = (
54 FragmentId,
55 &StreamNode,
56 impl Iterator<Item = (&StreamActor, WorkerId)>,
57 impl IntoIterator<Item = SubscriberId>,
58 ),
59 >,
60 ) -> StreamJobActorsToCreate {
61 let mut actors_to_create = StreamJobActorsToCreate::default();
62 for (fragment_id, node, actors, subscriber_ids) in actors {
63 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
64 for (actor, worker_id) in actors {
65 let upstreams = self
66 .upstreams
67 .get_mut(&fragment_id)
68 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
69 .unwrap_or_default();
70 let dispatchers = self
71 .dispatchers
72 .get_mut(&fragment_id)
73 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
74 .unwrap_or_default();
75 actors_to_create
76 .entry(worker_id)
77 .or_default()
78 .entry(fragment_id)
79 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
80 .1
81 .push((actor.clone(), upstreams, dispatchers))
82 }
83 }
84 actors_to_create
85 }
86
87 pub(super) fn is_empty(&self) -> bool {
88 self.merge_updates
89 .values()
90 .all(|updates| updates.is_empty())
91 && self.dispatchers.values().all(|dispatchers| {
92 dispatchers
93 .values()
94 .all(|dispatchers| dispatchers.is_empty())
95 })
96 && self
97 .merge_updates
98 .values()
99 .all(|updates| updates.is_empty())
100 }
101}
102
103pub(super) struct FragmentEdgeBuilder {
104 fragments: HashMap<FragmentId, FragmentInfo>,
105 result: FragmentEdgeBuildResult,
106}
107
108impl FragmentEdgeBuilder {
109 pub(super) fn new(
110 fragment_infos: impl Iterator<Item = (&InflightFragmentInfo, PartialGraphId)>,
111 control_stream_manager: &ControlStreamManager,
112 ) -> Self {
113 let mut fragments = HashMap::new();
114 for (info, partial_graph_id) in fragment_infos {
115 fragments
116 .try_insert(info.fragment_id, {
117 let (actors, actor_location) = info
118 .actors
119 .iter()
120 .map(|(actor_id, actor)| {
121 (
122 (*actor_id, actor.vnode_bitmap.clone()),
123 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
124 )
125 })
126 .unzip();
127 FragmentInfo {
128 distribution_type: info.distribution_type,
129 actors,
130 actor_location,
131 partial_graph_id,
132 }
133 })
134 .expect("non-duplicate");
135 }
136 Self {
137 fragments,
138 result: FragmentEdgeBuildResult {
139 upstreams: Default::default(),
140 dispatchers: Default::default(),
141 merge_updates: Default::default(),
142 },
143 }
144 }
145
146 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
147 for (fragment_id, relations) in relations {
148 for relation in relations {
149 self.add_edge(*fragment_id, relation);
150 }
151 }
152 }
153
154 pub(super) fn add_edge(
155 &mut self,
156 fragment_id: FragmentId,
157 downstream: &DownstreamFragmentRelation,
158 ) {
159 let Some(fragment) = &self.fragments.get(&fragment_id) else {
160 if self
161 .fragments
162 .contains_key(&downstream.downstream_fragment_id)
163 {
164 panic!(
165 "cannot find fragment {} with downstream {:?}",
166 fragment_id, downstream
167 )
168 } else {
169 return;
171 }
172 };
173 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
174 let dispatchers = compose_dispatchers(
175 fragment.distribution_type,
176 &fragment.actors,
177 downstream.downstream_fragment_id,
178 downstream_fragment.distribution_type,
179 &downstream_fragment.actors,
180 downstream.dispatcher_type,
181 downstream.dist_key_indices.clone(),
182 downstream.output_mapping.clone(),
183 );
184 let downstream_fragment_upstreams = self
185 .result
186 .upstreams
187 .entry(downstream.downstream_fragment_id)
188 .or_default();
189 for (actor_id, dispatcher) in dispatchers {
190 let actor_location = &fragment.actor_location[&actor_id];
191 for downstream_actor in &dispatcher.downstream_actor_id {
192 downstream_fragment_upstreams
193 .entry(*downstream_actor)
194 .or_default()
195 .entry(fragment_id)
196 .or_default()
197 .insert(
198 actor_id,
199 ActorInfo {
200 actor_id,
201 host: Some(actor_location.clone()),
202 partial_graph_id: fragment.partial_graph_id,
203 },
204 );
205 }
206 self.result
207 .dispatchers
208 .entry(fragment_id)
209 .or_default()
210 .entry(actor_id)
211 .or_default()
212 .push(dispatcher);
213 }
214 }
215
216 pub(super) fn replace_upstream(
217 &mut self,
218 fragment_id: FragmentId,
219 original_upstream_fragment_id: FragmentId,
220 new_upstream_fragment_id: FragmentId,
221 ) {
222 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
223 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
224 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
225 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
226 fragment_merge_updates.push(MergeUpdate {
227 actor_id,
228 upstream_fragment_id: original_upstream_fragment_id,
229 new_upstream_fragment_id: Some(new_upstream_fragment_id),
230 added_upstream_actors: new_upstreams.into_values().collect(),
231 removed_upstream_actor_id: vec![],
232 })
233 } else if cfg!(debug_assertions) {
234 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
235 } else {
236 warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
237 }
238 !actor_upstreams.is_empty()
239 })
240 } else if cfg!(debug_assertions) {
241 panic!(
242 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
243 fragment_id,
244 new_upstream_fragment_id,
245 original_upstream_fragment_id,
246 self.result.upstreams
247 );
248 } else {
249 warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
250 }
251 }
252
253 pub(super) fn build(self) -> FragmentEdgeBuildResult {
254 self.result
255 }
256}