risingwave_meta/barrier/
edge_builder.rs1use std::collections::HashMap;
16
17use risingwave_common::bitmap::Bitmap;
18use risingwave_meta_model::WorkerId;
19use risingwave_meta_model::fragment::DistributionType;
20use risingwave_pb::stream_plan::StreamNode;
21use risingwave_pb::stream_plan::update_mutation::MergeUpdate;
22use tracing::warn;
23
24use crate::controller::fragment::InflightFragmentInfo;
25use crate::controller::utils::compose_dispatchers;
26use crate::model::{
27 ActorId, DownstreamFragmentRelation, FragmentActorDispatchers, FragmentActorUpstreams,
28 FragmentDownstreamRelation, FragmentId, StreamActor, StreamJobActorsToCreate,
29};
30
31#[derive(Debug)]
32struct FragmentInfo {
33 distribution_type: DistributionType,
34 actors: HashMap<ActorId, Option<Bitmap>>,
35}
36
37pub(super) struct FragmentEdgeBuildResult {
38 pub(super) upstreams: HashMap<FragmentId, FragmentActorUpstreams>,
39 pub(super) dispatchers: FragmentActorDispatchers,
40 pub(super) merge_updates: HashMap<FragmentId, Vec<MergeUpdate>>,
41}
42
43impl FragmentEdgeBuildResult {
44 pub(super) fn collect_actors_to_create(
45 &mut self,
46 actors: impl Iterator<
47 Item = (
48 FragmentId,
49 &StreamNode,
50 impl Iterator<Item = (&StreamActor, WorkerId)> + '_,
51 ),
52 >,
53 ) -> StreamJobActorsToCreate {
54 let mut actors_to_create = StreamJobActorsToCreate::default();
55 for (fragment_id, node, actors) in actors {
56 for (actor, worker_id) in actors {
57 let upstreams = self
58 .upstreams
59 .get_mut(&fragment_id)
60 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
61 .unwrap_or_default();
62 let dispatchers = self
63 .dispatchers
64 .get_mut(&fragment_id)
65 .and_then(|upstreams| upstreams.remove(&actor.actor_id))
66 .unwrap_or_default();
67 actors_to_create
68 .entry(worker_id)
69 .or_default()
70 .entry(fragment_id)
71 .or_insert_with(|| (node.clone(), vec![]))
72 .1
73 .push((actor.clone(), upstreams, dispatchers))
74 }
75 }
76 actors_to_create
77 }
78}
79
80pub(super) struct FragmentEdgeBuilder {
81 fragments: HashMap<FragmentId, FragmentInfo>,
82 result: FragmentEdgeBuildResult,
83}
84
85impl FragmentEdgeBuilder {
86 pub(super) fn new(fragment_infos: impl Iterator<Item = &InflightFragmentInfo>) -> Self {
87 let mut fragments = HashMap::new();
88 for info in fragment_infos {
89 fragments
90 .try_insert(
91 info.fragment_id,
92 FragmentInfo {
93 distribution_type: info.distribution_type,
94 actors: info
95 .actors
96 .iter()
97 .map(|(actor_id, actor)| (*actor_id, actor.vnode_bitmap.clone()))
98 .collect(),
99 },
100 )
101 .expect("non-duplicate");
102 }
103 Self {
104 fragments,
105 result: FragmentEdgeBuildResult {
106 upstreams: Default::default(),
107 dispatchers: Default::default(),
108 merge_updates: Default::default(),
109 },
110 }
111 }
112
113 pub(super) fn add_relations(&mut self, relations: &FragmentDownstreamRelation) {
114 for (fragment_id, relations) in relations {
115 for relation in relations {
116 self.add_edge(*fragment_id, relation);
117 }
118 }
119 }
120
121 fn add_edge(&mut self, fragment_id: FragmentId, downstream: &DownstreamFragmentRelation) {
122 let fragment = &self
123 .fragments
124 .get(&fragment_id)
125 .unwrap_or_else(|| panic!("cannot find {}", fragment_id));
126 let downstream_fragment = &self.fragments[&downstream.downstream_fragment_id];
127 let dispatchers = compose_dispatchers(
128 fragment.distribution_type,
129 &fragment.actors,
130 downstream.downstream_fragment_id,
131 downstream_fragment.distribution_type,
132 &downstream_fragment.actors,
133 downstream.dispatcher_type,
134 downstream.dist_key_indices.clone(),
135 downstream.output_indices.clone(),
136 );
137 let downstream_fragment_upstreams = self
138 .result
139 .upstreams
140 .entry(downstream.downstream_fragment_id)
141 .or_default();
142 for (actor_id, dispatcher) in dispatchers {
143 for downstream_actor in &dispatcher.downstream_actor_id {
144 downstream_fragment_upstreams
145 .entry(*downstream_actor)
146 .or_default()
147 .entry(fragment_id)
148 .or_default()
149 .insert(actor_id);
150 }
151 self.result
152 .dispatchers
153 .entry(fragment_id)
154 .or_default()
155 .entry(actor_id)
156 .or_default()
157 .push(dispatcher);
158 }
159 }
160
161 pub(super) fn replace_upstream(
162 &mut self,
163 fragment_id: FragmentId,
164 original_upstream_fragment_id: FragmentId,
165 new_upstream_fragment_id: FragmentId,
166 ) {
167 let fragment_merge_updates = self.result.merge_updates.entry(fragment_id).or_default();
168 if let Some(fragment_upstreams) = self.result.upstreams.get_mut(&fragment_id) {
169 fragment_upstreams.retain(|&actor_id, actor_upstreams| {
170 if let Some(new_upstreams) = actor_upstreams.remove(&new_upstream_fragment_id) {
171 fragment_merge_updates.push(MergeUpdate {
172 actor_id,
173 upstream_fragment_id: original_upstream_fragment_id,
174 new_upstream_fragment_id: Some(new_upstream_fragment_id),
175 added_upstream_actor_id: new_upstreams.into_iter().collect(),
176 removed_upstream_actor_id: vec![],
177 })
178 } else if cfg!(debug_assertions) {
179 panic!("cannot find new upstreams for actor {} in fragment {} to new_upstream {}. Current upstreams {:?}", actor_id, fragment_id, new_upstream_fragment_id, actor_upstreams);
180 } else {
181 warn!(actor_id, fragment_id, new_upstream_fragment_id, ?actor_upstreams, "cannot find new upstreams for actor");
182 }
183 !actor_upstreams.is_empty()
184 })
185 } else if cfg!(debug_assertions) {
186 panic!(
187 "cannot find new upstreams for fragment {} to new_upstream {} to replace {}. Current upstreams: {:?}",
188 fragment_id,
189 new_upstream_fragment_id,
190 original_upstream_fragment_id,
191 self.result.upstreams
192 );
193 } else {
194 warn!(fragment_id, new_upstream_fragment_id, original_upstream_fragment_id, upstreams = ?self.result.upstreams, "cannot find new upstreams to replace");
195 }
196 }
197
198 pub(super) fn build(self) -> FragmentEdgeBuildResult {
199 self.result
200 }
201}