1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::stream_plan::StreamNode;
22use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
23use tracing::warn;
24
25use crate::barrier::rpc::ControlStreamManager;
26use crate::controller::fragment::InflightFragmentInfo;
27use crate::controller::utils::compose_dispatchers;
28use crate::model::{
29 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
30 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
31};
32
33#[derive(Debug)]
34struct FragmentInfo {
35 distribution_type: DistributionType,
36 actors: HashMap<ActorId, Option<Bitmap>>,
37 actor_location: HashMap<ActorId, HostAddress>,
38}
39
40pub(super) struct FragmentEdgeBuildResult {
41 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
42 pub(super) dispatchers: FragmentActorDispatchers,
43 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
44}
45
46impl FragmentEdgeBuildResult {
47 pub(super) fn collect_actors_to_create(
48 &mut self,
49 actors: impl Iterator<
50 Item = (
51 FragmentId,
52 &StreamNode,
53 impl Iterator<Item = (&StreamActor, WorkerId)>,
54 impl IntoIterator<Item = u32>,
55 ),
56 >,
57 ) -> StreamJobActorsToCreate {
58 let mut actors_to_create = StreamJobActorsToCreate::default();
59 for (fragment_id, node, actors, subscriber_ids) in actors {
60 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
61 for (actor, worker_id) in actors {
62 let upstreams = self
63 .upstreams
64 .get_mut(&fragment_id)
65 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
66 .unwrap_or_default();
67 let dispatchers = self
68 .dispatchers
69 .get_mut(&fragment_id)
70 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
71 .unwrap_or_default();
72 actors_to_create
73 .entry(worker_id)
74 .or_default()
75 .entry(fragment_id)
76 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
77 .1
78 .push((actor.clone(), upstreams, dispatchers))
79 }
80 }
81 actors_to_create
82 }
83}
84
85pub(super) struct FragmentEdgeBuilder {
86 fragments: HashMap<FragmentId, FragmentInfo>,
87 result: FragmentEdgeBuildResult,
88}
89
90impl FragmentEdgeBuilder {
91 pub(super) fn new(
92 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
93 control_stream_manager: &ControlStreamManager,
94 ) -> Self {
95 let mut fragments = HashMap::new();
96 for info in fragment_infos {
97 fragments
98 .try_insert(info.fragment_id, {
99 let (actors, actor_location) = info
100 .actors
101 .iter()
102 .map(|(actor_id, actor)| {
103 (
104 (*actor_id, actor.vnode_bitmap.clone()),
105 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
106 )
107 })
108 .unzip();
109 FragmentInfo {
110 distribution_type: info.distribution_type,
111 actors,
112 actor_location,
113 }
114 })
115 .expect("non-duplicate");
116 }
117 Self {
118 fragments,
119 result: FragmentEdgeBuildResult {
120 upstreams: Default::default(),
121 dispatchers: Default::default(),
122 merge_updates: Default::default(),
123 },
124 }
125 }
126
127 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
128 for (fragment_id, relations) in relations {
129 for relation in relations {
130 self.add_edge(*fragment_id, relation);
131 }
132 }
133 }
134
135 pub(super) fn add_edge(
136 &mut self,
137 fragment_id: FragmentId,
138 downstream: &DownstreamFragmentRelation,
139 ) {
140 let fragment = &self
141 .fragments
142 .get(&fragment_id)
143 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
144 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
145 let dispatchers = compose_dispatchers(
146 fragment.distribution_type,
147 &fragment.actors,
148 downstream.downstream_fragment_id,
149 downstream_fragment.distribution_type,
150 &downstream_fragment.actors,
151 downstream.dispatcher_type,
152 downstream.dist_key_indices.clone(),
153 downstream.output_mapping.clone(),
154 );
155 let downstream_fragment_upstreams = self
156 .result
157 .upstreams
158 .entry(downstream.downstream_fragment_id)
159 .or_default();
160 for (actor_id, dispatcher) in dispatchers {
161 let actor_location = &fragment.actor_location[&actor_id];
162 for downstream_actor in &dispatcher.downstream_actor_id {
163 downstream_fragment_upstreams
164 .entry(*downstream_actor)
165 .or_default()
166 .entry(fragment_id)
167 .or_default()
168 .insert(
169 actor_id,
170 ActorInfo {
171 actor_id,
172 host: Some(actor_location.clone()),
173 },
174 );
175 }
176 self.result
177 .dispatchers
178 .entry(fragment_id)
179 .or_default()
180 .entry(actor_id)
181 .or_default()
182 .push(dispatcher);
183 }
184 }
185
186 pub(super) fn replace_upstream(
187 &mut self,
188 fragment_id: FragmentId,
189 original_upstream_fragment_id: FragmentId,
190 new_upstream_fragment_id: FragmentId,
191 ) {
192 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
193 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
194 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
195 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
196 fragment_merge_updates.push(MergeUpdate {
197 actor_id,
198 upstream_fragment_id: original_upstream_fragment_id,
199 new_upstream_fragment_id: Some(new_upstream_fragment_id),
200 added_upstream_actors: new_upstreams.into_values().collect(),
201 removed_upstream_actor_id: vec![],
202 })
203 } else if cfg!(debug_assertions) {
204 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
205 } else {
206 warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
207 }
208 !actor_upstreams.is_empty()
209 })
210 } else if cfg!(debug_assertions) {
211 panic!(
212 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
213 fragment_id,
214 new_upstream_fragment_id,
215 original_upstream_fragment_id,
216 self.result.upstreams
217 );
218 } else {
219 warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
220 }
221 }
222
223 pub(super) fn build(self) -> FragmentEdgeBuildResult {
224 self.result
225 }
226}