1use std::collections::HashMap;
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::stream_plan::StreamNode;
22use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
23use tracing::warn;
24
25use crate::barrier::rpc::ControlStreamManager;
26use crate::controller::fragment::InflightFragmentInfo;
27use crate::controller::utils::compose_dispatchers;
28use crate::model::{
29 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
30 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
31};
32
33#[derive(Debug)]
34struct FragmentInfo {
35 distribution_type: DistributionType,
36 actors: HashMap<ActorId, Option<Bitmap>>,
37 actor_location: HashMap<ActorId, HostAddress>,
38}
39
40pub(super) struct FragmentEdgeBuildResult {
41 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
42 pub(super) dispatchers: FragmentActorDispatchers,
43 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
44}
45
46impl FragmentEdgeBuildResult {
47 pub(super) fn collect_actors_to_create(
48 &mut self,
49 actors: impl Iterator<
50 Item = (
51 FragmentId,
52 &StreamNode,
53 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
54 ),
55 >,
56 ) -> StreamJobActorsToCreate {
57 let mut actors_to_create = StreamJobActorsToCreate::default();
58 for (fragment_id, node, actors) in actors {
59 for (actor, worker_id) in actors {
60 let upstreams = self
61 .upstreams
62 .get_mut(&fragment_id)
63 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
64 .unwrap_or_default();
65 let dispatchers = self
66 .dispatchers
67 .get_mut(&fragment_id)
68 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
69 .unwrap_or_default();
70 actors_to_create
71 .entry(worker_id)
72 .or_default()
73 .entry(fragment_id)
74 .or_insert_with(|| (node.clone(), vec![]))
75 .1
76 .push((actor.clone(), upstreams, dispatchers))
77 }
78 }
79 actors_to_create
80 }
81}
82
83pub(super) struct FragmentEdgeBuilder {
84 fragments: HashMap<FragmentId, FragmentInfo>,
85 result: FragmentEdgeBuildResult,
86}
87
88impl FragmentEdgeBuilder {
89 pub(super) fn new(
90 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
91 control_stream_manager: &ControlStreamManager,
92 ) -> Self {
93 let mut fragments = HashMap::new();
94 for info in fragment_infos {
95 fragments
96 .try_insert(info.fragment_id, {
97 let (actors, actor_location) = info
98 .actors
99 .iter()
100 .map(|(actor_id, actor)| {
101 (
102 (*actor_id, actor.vnode_bitmap.clone()),
103 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
104 )
105 })
106 .unzip();
107 FragmentInfo {
108 distribution_type: info.distribution_type,
109 actors,
110 actor_location,
111 }
112 })
113 .expect("non-duplicate");
114 }
115 Self {
116 fragments,
117 result: FragmentEdgeBuildResult {
118 upstreams: Default::default(),
119 dispatchers: Default::default(),
120 merge_updates: Default::default(),
121 },
122 }
123 }
124
125 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
126 for (fragment_id, relations) in relations {
127 for relation in relations {
128 self.add_edge(*fragment_id, relation);
129 }
130 }
131 }
132
133 pub(super) fn add_edge(
134 &mut self,
135 fragment_id: FragmentId,
136 downstream: &DownstreamFragmentRelation,
137 ) {
138 let fragment = &self
139 .fragments
140 .get(&fragment_id)
141 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
142 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
143 let dispatchers = compose_dispatchers(
144 fragment.distribution_type,
145 &fragment.actors,
146 downstream.downstream_fragment_id,
147 downstream_fragment.distribution_type,
148 &downstream_fragment.actors,
149 downstream.dispatcher_type,
150 downstream.dist_key_indices.clone(),
151 downstream.output_mapping.clone(),
152 );
153 let downstream_fragment_upstreams = self
154 .result
155 .upstreams
156 .entry(downstream.downstream_fragment_id)
157 .or_default();
158 for (actor_id, dispatcher) in dispatchers {
159 let actor_location = &fragment.actor_location[&actor_id];
160 for downstream_actor in &dispatcher.downstream_actor_id {
161 downstream_fragment_upstreams
162 .entry(*downstream_actor)
163 .or_default()
164 .entry(fragment_id)
165 .or_default()
166 .insert(
167 actor_id,
168 ActorInfo {
169 actor_id,
170 host: Some(actor_location.clone()),
171 },
172 );
173 }
174 self.result
175 .dispatchers
176 .entry(fragment_id)
177 .or_default()
178 .entry(actor_id)
179 .or_default()
180 .push(dispatcher);
181 }
182 }
183
184 pub(super) fn replace_upstream(
185 &mut self,
186 fragment_id: FragmentId,
187 original_upstream_fragment_id: FragmentId,
188 new_upstream_fragment_id: FragmentId,
189 ) {
190 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
191 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
192 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
193 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
194 fragment_merge_updates.push(MergeUpdate {
195 actor_id,
196 upstream_fragment_id: original_upstream_fragment_id,
197 new_upstream_fragment_id: Some(new_upstream_fragment_id),
198 added_upstream_actors: new_upstreams.into_values().collect(),
199 removed_upstream_actor_id: vec![],
200 })
201 } else if cfg!(debug_assertions) {
202 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
203 } else {
204 warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
205 }
206 !actor_upstreams.is_empty()
207 })
208 } else if cfg!(debug_assertions) {
209 panic!(
210 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
211 fragment_id,
212 new_upstream_fragment_id,
213 original_upstream_fragment_id,
214 self.result.upstreams
215 );
216 } else {
217 warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
218 }
219 }
220
221 pub(super) fn build(self) -> FragmentEdgeBuildResult {
222 self.result
223 }
224}