1use std::collections::HashMap;
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::stream_plan::StreamNode;
22use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
23use tracing::warn;
24
25use crate::barrier::rpc::ControlStreamManager;
26use crate::controller::fragment::InflightFragmentInfo;
27use crate::controller::utils::compose_dispatchers;
28use crate::model::{
29 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
30 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
31};
32
33#[derive(Debug)]
34struct FragmentInfo {
35 distribution_type: DistributionType,
36 actors: HashMap<ActorId, Option<Bitmap>>,
37 actor_location: HashMap<ActorId, HostAddress>,
38}
39
40pub(super) struct FragmentEdgeBuildResult {
41 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
42 pub(super) dispatchers: FragmentActorDispatchers,
43 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
44}
45
46impl FragmentEdgeBuildResult {
47 pub(super) fn collect_actors_to_create(
48 &mut self,
49 actors: impl Iterator<
50 Item = (
51 FragmentId,
52 &StreamNode,
53 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
54 ),
55 >,
56 ) -> StreamJobActorsToCreate {
57 let mut actors_to_create = StreamJobActorsToCreate::default();
58 for (fragment_id, node, actors) in actors {
59 for (actor, worker_id) in actors {
60 let upstreams = self
61 .upstreams
62 .get_mut(&fragment_id)
63 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
64 .unwrap_or_default();
65 let dispatchers = self
66 .dispatchers
67 .get_mut(&fragment_id)
68 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
69 .unwrap_or_default();
70 actors_to_create
71 .entry(worker_id)
72 .or_default()
73 .entry(fragment_id)
74 .or_insert_with(|| (node.clone(), vec![]))
75 .1
76 .push((actor.clone(), upstreams, dispatchers))
77 }
78 }
79 actors_to_create
80 }
81}
82
83pub(super) struct FragmentEdgeBuilder {
84 fragments: HashMap<FragmentId, FragmentInfo>,
85 result: FragmentEdgeBuildResult,
86}
87
88impl FragmentEdgeBuilder {
89 pub(super) fn new(
90 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
91 control_stream_manager: &ControlStreamManager,
92 ) -> Self {
93 let mut fragments = HashMap::new();
94 for info in fragment_infos {
95 fragments
96 .try_insert(info.fragment_id, {
97 let (actors, actor_location) = info
98 .actors
99 .iter()
100 .map(|(actor_id, actor)| {
101 (
102 (*actor_id, actor.vnode_bitmap.clone()),
103 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
104 )
105 })
106 .unzip();
107 FragmentInfo {
108 distribution_type: info.distribution_type,
109 actors,
110 actor_location,
111 }
112 })
113 .expect("non-duplicate");
114 }
115 Self {
116 fragments,
117 result: FragmentEdgeBuildResult {
118 upstreams: Default::default(),
119 dispatchers: Default::default(),
120 merge_updates: Default::default(),
121 },
122 }
123 }
124
125 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
126 for (fragment_id, relations) in relations {
127 for relation in relations {
128 self.add_edge(*fragment_id, relation);
129 }
130 }
131 }
132
133 fn add_edge(&mut self, fragment_id: FragmentId, downstream: &DownstreamFragmentRelation) {
134 let fragment = &self
135 .fragments
136 .get(&fragment_id)
137 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
138 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
139 let dispatchers = compose_dispatchers(
140 fragment.distribution_type,
141 &fragment.actors,
142 downstream.downstream_fragment_id,
143 downstream_fragment.distribution_type,
144 &downstream_fragment.actors,
145 downstream.dispatcher_type,
146 downstream.dist_key_indices.clone(),
147 downstream.output_indices.clone(),
148 );
149 let downstream_fragment_upstreams = self
150 .result
151 .upstreams
152 .entry(downstream.downstream_fragment_id)
153 .or_default();
154 for (actor_id, dispatcher) in dispatchers {
155 let actor_location = &fragment.actor_location[&actor_id];
156 for downstream_actor in &dispatcher.downstream_actor_id {
157 downstream_fragment_upstreams
158 .entry(*downstream_actor)
159 .or_default()
160 .entry(fragment_id)
161 .or_default()
162 .insert(
163 actor_id,
164 ActorInfo {
165 actor_id,
166 host: Some(actor_location.clone()),
167 },
168 );
169 }
170 self.result
171 .dispatchers
172 .entry(fragment_id)
173 .or_default()
174 .entry(actor_id)
175 .or_default()
176 .push(dispatcher);
177 }
178 }
179
180 pub(super) fn replace_upstream(
181 &mut self,
182 fragment_id: FragmentId,
183 original_upstream_fragment_id: FragmentId,
184 new_upstream_fragment_id: FragmentId,
185 ) {
186 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
187 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
188 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
189 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
190 fragment_merge_updates.push(MergeUpdate {
191 actor_id,
192 upstream_fragment_id: original_upstream_fragment_id,
193 new_upstream_fragment_id: Some(new_upstream_fragment_id),
194 added_upstream_actors: new_upstreams.into_values().collect(),
195 removed_upstream_actor_id: vec![],
196 })
197 } else if cfg!(debug_assertions) {
198 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
199 } else {
200 warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
201 }
202 !actor_upstreams.is_empty()
203 })
204 } else if cfg!(debug_assertions) {
205 panic!(
206 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
207 fragment_id,
208 new_upstream_fragment_id,
209 original_upstream_fragment_id,
210 self.result.upstreams
211 );
212 } else {
213 warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
214 }
215 }
216
217 pub(super) fn build(self) -> FragmentEdgeBuildResult {
218 self.result
219 }
220}