1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::SubscriberId;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36 distribution_type: DistributionType,
37 actors: HashMap<ActorId, Option<Bitmap>>,
38 actor_location: HashMap<ActorId, HostAddress>,
39}
40
41#[derive(Debug)]
42pub(super) struct FragmentEdgeBuildResult {
43 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
44 pub(super) dispatchers: FragmentActorDispatchers,
45 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
46}
47
48impl FragmentEdgeBuildResult {
49 pub(super) fn collect_actors_to_create(
50 &mut self,
51 actors: impl Iterator<
52 Item = (
53 FragmentId,
54 &StreamNode,
55 impl Iterator<Item = (&StreamActor, WorkerId)>,
56 impl IntoIterator<Item = SubscriberId>,
57 ),
58 >,
59 ) -> StreamJobActorsToCreate {
60 let mut actors_to_create = StreamJobActorsToCreate::default();
61 for (fragment_id, node, actors, subscriber_ids) in actors {
62 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
63 for (actor, worker_id) in actors {
64 let upstreams = self
65 .upstreams
66 .get_mut(&fragment_id)
67 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
68 .unwrap_or_default();
69 let dispatchers = self
70 .dispatchers
71 .get_mut(&fragment_id)
72 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
73 .unwrap_or_default();
74 actors_to_create
75 .entry(worker_id)
76 .or_default()
77 .entry(fragment_id)
78 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
79 .1
80 .push((actor.clone(), upstreams, dispatchers))
81 }
82 }
83 actors_to_create
84 }
85
86 pub(super) fn is_empty(&self) -> bool {
87 self.merge_updates
88 .values()
89 .all(|updates| updates.is_empty())
90 && self.dispatchers.values().all(|dispatchers| {
91 dispatchers
92 .values()
93 .all(|dispatchers| dispatchers.is_empty())
94 })
95 && self
96 .merge_updates
97 .values()
98 .all(|updates| updates.is_empty())
99 }
100}
101
102pub(super) struct FragmentEdgeBuilder {
103 fragments: HashMap<FragmentId, FragmentInfo>,
104 result: FragmentEdgeBuildResult,
105}
106
107impl FragmentEdgeBuilder {
108 pub(super) fn new(
109 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
110 control_stream_manager: &ControlStreamManager,
111 ) -> Self {
112 let mut fragments = HashMap::new();
113 for info in fragment_infos {
114 fragments
115 .try_insert(info.fragment_id, {
116 let (actors, actor_location) = info
117 .actors
118 .iter()
119 .map(|(actor_id, actor)| {
120 (
121 (*actor_id, actor.vnode_bitmap.clone()),
122 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
123 )
124 })
125 .unzip();
126 FragmentInfo {
127 distribution_type: info.distribution_type,
128 actors,
129 actor_location,
130 }
131 })
132 .expect("non-duplicate");
133 }
134 Self {
135 fragments,
136 result: FragmentEdgeBuildResult {
137 upstreams: Default::default(),
138 dispatchers: Default::default(),
139 merge_updates: Default::default(),
140 },
141 }
142 }
143
144 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
145 for (fragment_id, relations) in relations {
146 for relation in relations {
147 self.add_edge(*fragment_id, relation);
148 }
149 }
150 }
151
152 pub(super) fn add_edge(
153 &mut self,
154 fragment_id: FragmentId,
155 downstream: &DownstreamFragmentRelation,
156 ) {
157 let fragment = &self
158 .fragments
159 .get(&fragment_id)
160 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
161 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
162 let dispatchers = compose_dispatchers(
163 fragment.distribution_type,
164 &fragment.actors,
165 downstream.downstream_fragment_id,
166 downstream_fragment.distribution_type,
167 &downstream_fragment.actors,
168 downstream.dispatcher_type,
169 downstream.dist_key_indices.clone(),
170 downstream.output_mapping.clone(),
171 );
172 let downstream_fragment_upstreams = self
173 .result
174 .upstreams
175 .entry(downstream.downstream_fragment_id)
176 .or_default();
177 for (actor_id, dispatcher) in dispatchers {
178 let actor_location = &fragment.actor_location[&actor_id];
179 for downstream_actor in &dispatcher.downstream_actor_id {
180 downstream_fragment_upstreams
181 .entry(*downstream_actor)
182 .or_default()
183 .entry(fragment_id)
184 .or_default()
185 .insert(
186 actor_id,
187 ActorInfo {
188 actor_id,
189 host: Some(actor_location.clone()),
190 },
191 );
192 }
193 self.result
194 .dispatchers
195 .entry(fragment_id)
196 .or_default()
197 .entry(actor_id)
198 .or_default()
199 .push(dispatcher);
200 }
201 }
202
203 pub(super) fn replace_upstream(
204 &mut self,
205 fragment_id: FragmentId,
206 original_upstream_fragment_id: FragmentId,
207 new_upstream_fragment_id: FragmentId,
208 ) {
209 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
210 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
211 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
212 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
213 fragment_merge_updates.push(MergeUpdate {
214 actor_id,
215 upstream_fragment_id: original_upstream_fragment_id,
216 new_upstream_fragment_id: Some(new_upstream_fragment_id),
217 added_upstream_actors: new_upstreams.into_values().collect(),
218 removed_upstream_actor_id: vec![],
219 })
220 } else if cfg!(debug_assertions) {
221 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
222 } else {
223 warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
224 }
225 !actor_upstreams.is_empty()
226 })
227 } else if cfg!(debug_assertions) {
228 panic!(
229 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
230 fragment_id,
231 new_upstream_fragment_id,
232 original_upstream_fragment_id,
233 self.result.upstreams
234 );
235 } else {
236 warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
237 }
238 }
239
240 pub(super) fn build(self) -> FragmentEdgeBuildResult {
241 self.result
242 }
243}