1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::common::{ActorInfo, HostAddress};
21use risingwave_pb::id::SubscriberId;
22use risingwave_pb::stream_plan::StreamNode;
23use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
24use tracing::warn;
25
26use crate::barrier::rpc::ControlStreamManager;
27use crate::controller::fragment::InflightFragmentInfo;
28use crate::controller::utils::compose_dispatchers;
29use crate::model::{
30 ActorId, ActorUpstreams, DownstreamFragmentRelation, FragmentActorDispatchers,
31 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
32};
33
34#[derive(Debug)]
35struct FragmentInfo {
36 distribution_type: DistributionType,
37 actors: HashMap<ActorId, Option<Bitmap>>,
38 actor_location: HashMap<ActorId, HostAddress>,
39}
40
41pub(super) struct FragmentEdgeBuildResult {
42 pub(super) upstreams: HashMap<FragmentId, HashMap<ActorId, ActorUpstreams>>,
43 pub(super) dispatchers: FragmentActorDispatchers,
44 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
45}
46
47impl FragmentEdgeBuildResult {
48 pub(super) fn collect_actors_to_create(
49 &mut self,
50 actors: impl Iterator<
51 Item = (
52 FragmentId,
53 &StreamNode,
54 impl Iterator<Item = (&StreamActor, WorkerId)>,
55 impl IntoIterator<Item = SubscriberId>,
56 ),
57 >,
58 ) -> StreamJobActorsToCreate {
59 let mut actors_to_create = StreamJobActorsToCreate::default();
60 for (fragment_id, node, actors, subscriber_ids) in actors {
61 let subscriber_ids: HashSet<_> = subscriber_ids.into_iter().collect();
62 for (actor, worker_id) in actors {
63 let upstreams = self
64 .upstreams
65 .get_mut(&fragment_id)
66 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
67 .unwrap_or_default();
68 let dispatchers = self
69 .dispatchers
70 .get_mut(&fragment_id)
71 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
72 .unwrap_or_default();
73 actors_to_create
74 .entry(worker_id)
75 .or_default()
76 .entry(fragment_id)
77 .or_insert_with(|| (node.clone(), vec![], subscriber_ids.clone()))
78 .1
79 .push((actor.clone(), upstreams, dispatchers))
80 }
81 }
82 actors_to_create
83 }
84}
85
86pub(super) struct FragmentEdgeBuilder {
87 fragments: HashMap<FragmentId, FragmentInfo>,
88 result: FragmentEdgeBuildResult,
89}
90
91impl FragmentEdgeBuilder {
92 pub(super) fn new(
93 fragment_infos: impl Iterator<Item = &InflightFragmentInfo>,
94 control_stream_manager: &ControlStreamManager,
95 ) -> Self {
96 let mut fragments = HashMap::new();
97 for info in fragment_infos {
98 fragments
99 .try_insert(info.fragment_id, {
100 let (actors, actor_location) = info
101 .actors
102 .iter()
103 .map(|(actor_id, actor)| {
104 (
105 (*actor_id, actor.vnode_bitmap.clone()),
106 (*actor_id, control_stream_manager.host_addr(actor.worker_id)),
107 )
108 })
109 .unzip();
110 FragmentInfo {
111 distribution_type: info.distribution_type,
112 actors,
113 actor_location,
114 }
115 })
116 .expect("non-duplicate");
117 }
118 Self {
119 fragments,
120 result: FragmentEdgeBuildResult {
121 upstreams: Default::default(),
122 dispatchers: Default::default(),
123 merge_updates: Default::default(),
124 },
125 }
126 }
127
128 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
129 for (fragment_id, relations) in relations {
130 for relation in relations {
131 self.add_edge(*fragment_id, relation);
132 }
133 }
134 }
135
136 pub(super) fn add_edge(
137 &mut self,
138 fragment_id: FragmentId,
139 downstream: &DownstreamFragmentRelation,
140 ) {
141 let fragment = &self
142 .fragments
143 .get(&fragment_id)
144 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
145 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
146 let dispatchers = compose_dispatchers(
147 fragment.distribution_type,
148 &fragment.actors,
149 downstream.downstream_fragment_id,
150 downstream_fragment.distribution_type,
151 &downstream_fragment.actors,
152 downstream.dispatcher_type,
153 downstream.dist_key_indices.clone(),
154 downstream.output_mapping.clone(),
155 );
156 let downstream_fragment_upstreams = self
157 .result
158 .upstreams
159 .entry(downstream.downstream_fragment_id)
160 .or_default();
161 for (actor_id, dispatcher) in dispatchers {
162 let actor_location = &fragment.actor_location[&actor_id];
163 for downstream_actor in &dispatcher.downstream_actor_id {
164 downstream_fragment_upstreams
165 .entry(*downstream_actor)
166 .or_default()
167 .entry(fragment_id)
168 .or_default()
169 .insert(
170 actor_id,
171 ActorInfo {
172 actor_id,
173 host: Some(actor_location.clone()),
174 },
175 );
176 }
177 self.result
178 .dispatchers
179 .entry(fragment_id)
180 .or_default()
181 .entry(actor_id)
182 .or_default()
183 .push(dispatcher);
184 }
185 }
186
187 pub(super) fn replace_upstream(
188 &mut self,
189 fragment_id: FragmentId,
190 original_upstream_fragment_id: FragmentId,
191 new_upstream_fragment_id: FragmentId,
192 ) {
193 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
194 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
195 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
196 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
197 fragment_merge_updates.push(MergeUpdate {
198 actor_id,
199 upstream_fragment_id: original_upstream_fragment_id,
200 new_upstream_fragment_id: Some(new_upstream_fragment_id),
201 added_upstream_actors: new_upstreams.into_values().collect(),
202 removed_upstream_actor_id: vec![],
203 })
204 } else if cfg!(debug_assertions) {
205 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
206 } else {
207 warn!(%actor_id, %fragment_id, %new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
208 }
209 !actor_upstreams.is_empty()
210 })
211 } else if cfg!(debug_assertions) {
212 panic!(
213 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
214 fragment_id,
215 new_upstream_fragment_id,
216 original_upstream_fragment_id,
217 self.result.upstreams
218 );
219 } else {
220 warn!(%fragment_id, %new_upstream_fragment_id, %original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
221 }
222 }
223
224 pub(super) fn build(self) -> FragmentEdgeBuildResult {
225 self.result
226 }
227}