1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::{FragmentId, StreamJobFragments};
22
23#[derive(Clone, Debug, Default)]
24pub struct BackfillNode {
25 fragment_id: FragmentId,
26 remaining_actors: HashSet<ActorId>,
29 remaining_dependencies: HashSet<FragmentId>,
32 children: Vec<FragmentId>,
33}
34
35#[derive(Clone, Debug, Default)]
39pub struct BackfillOrderState {
40 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
42 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
46 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
48}
49
50pub fn get_nodes_with_backfill_dependencies(
53 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55 backfill_orders.values().flatten().copied().collect()
56}
57
58impl BackfillOrderState {
60 pub fn new(
61 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
62 stream_job_fragments: &StreamJobFragments,
63 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
64 ) -> Self {
65 tracing::debug!(?backfill_orders, "initialize backfill order state");
66 let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
67
68 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
69
70 for fragment in stream_job_fragments.fragments() {
71 if fragment.fragment_type_mask.contains_any([
72 FragmentTypeFlag::StreamScan,
73 FragmentTypeFlag::SourceScan,
74 FragmentTypeFlag::LocalityProvider,
75 ]) {
76 let fragment_id = fragment.fragment_id;
77 backfill_nodes.insert(
78 fragment_id,
79 BackfillNode {
80 fragment_id,
81 remaining_actors: stream_job_fragments
82 .fragment_actors(fragment_id)
83 .iter()
84 .map(|actor| actor.actor_id)
85 .collect(),
86 remaining_dependencies: Default::default(),
87 children: backfill_orders
88 .get(&fragment_id)
89 .cloned()
90 .unwrap_or_else(Vec::new),
91 },
92 );
93 }
94 }
95
96 for (fragment_id, children) in backfill_orders {
97 for child in children {
98 let child_node = backfill_nodes.get_mut(child).unwrap();
99 child_node.remaining_dependencies.insert(*fragment_id);
100 }
101 }
102
103 let mut current_backfill_nodes = HashMap::new();
104 let mut remaining_backfill_nodes = HashMap::new();
105 for (fragment_id, node) in backfill_nodes {
106 if node.remaining_dependencies.is_empty() {
107 current_backfill_nodes.insert(fragment_id, node);
108 } else {
109 remaining_backfill_nodes.insert(fragment_id, node);
110 }
111 }
112
113 Self {
114 current_backfill_nodes,
115 remaining_backfill_nodes,
116 actor_to_fragment_id,
117 locality_fragment_state_table_mapping,
118 }
119 }
120
121 pub fn recover_from_fragment_infos(
122 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
123 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
124 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
125 ) -> Self {
126 tracing::debug!(
127 ?backfill_orders,
128 "initialize backfill order state from recovery"
129 );
130 let actor_to_fragment_id = fragment_infos
131 .iter()
132 .flat_map(|(fragment_id, fragment)| {
133 fragment
134 .actors
135 .keys()
136 .map(|actor_id| (*actor_id, *fragment_id))
137 })
138 .collect();
139
140 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
141
142 for (fragment_id, fragment) in fragment_infos {
143 if fragment.fragment_type_mask.contains_any([
144 FragmentTypeFlag::StreamScan,
145 FragmentTypeFlag::SourceScan,
146 FragmentTypeFlag::LocalityProvider,
147 ]) {
148 backfill_nodes.insert(
149 *fragment_id,
150 BackfillNode {
151 fragment_id: *fragment_id,
152 remaining_actors: fragment.actors.keys().copied().collect(),
153 remaining_dependencies: Default::default(),
154 children: backfill_orders
155 .get(fragment_id)
156 .cloned()
157 .unwrap_or_else(Vec::new),
158 },
159 );
160 }
161 }
162
163 for (fragment_id, children) in backfill_orders {
164 for child in children {
165 let child_node = backfill_nodes.get_mut(child).unwrap();
166 child_node.remaining_dependencies.insert(*fragment_id);
167 }
168 }
169
170 let mut current_backfill_nodes = HashMap::new();
171 let mut remaining_backfill_nodes = HashMap::new();
172 for (fragment_id, node) in backfill_nodes {
173 if node.remaining_dependencies.is_empty() {
174 current_backfill_nodes.insert(fragment_id, node);
175 } else {
176 remaining_backfill_nodes.insert(fragment_id, node);
177 }
178 }
179
180 Self {
181 current_backfill_nodes,
182 remaining_backfill_nodes,
183 actor_to_fragment_id,
184 locality_fragment_state_table_mapping,
185 }
186 }
187}
188
189impl BackfillOrderState {
191 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
192 let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
193 tracing::error!(%actor_id, "fragment not found for actor");
194 return vec![];
195 };
196 let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
207 Some(node) => (node, true),
208 None => {
209 let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
210 tracing::error!(
211 %fragment_id,
212 %actor_id,
213 "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
214 );
215 return vec![];
216 };
217 (node, false)
218 }
219 };
220
221 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
222 tracing::debug!(
223 %actor_id,
224 remaining_actors = node.remaining_actors.len(),
225 %fragment_id,
226 "finish_backfilling_actor"
227 );
228 if node.remaining_actors.is_empty() && is_in_order {
229 self.finish_fragment(*fragment_id)
230 } else {
231 vec![]
232 }
233 }
234
235 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
236 let mut newly_scheduled = vec![];
237 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
240 for child_id in &node.children {
241 let newly_scheduled_child_finished = {
242 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
243 assert!(
244 child.remaining_dependencies.remove(&fragment_id),
245 "missing dependency"
246 );
247 if child.remaining_dependencies.is_empty() {
248 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
249 self.current_backfill_nodes
250 .insert(child.fragment_id, child.clone());
251 newly_scheduled.push(child.fragment_id);
252 child.remaining_actors.is_empty()
253 } else {
254 false
255 }
256 };
257 if newly_scheduled_child_finished {
258 newly_scheduled.extend(self.finish_fragment(*child_id));
259 }
260 }
261 } else {
262 tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
263 return vec![];
264 }
265 newly_scheduled
266 }
267
268 pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
269 self.current_backfill_nodes.keys().copied().collect()
270 }
271
272 pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
273 &self.locality_fragment_state_table_mapping
274 }
275
276 pub fn refresh_actors(
278 &mut self,
279 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
280 ) -> Vec<FragmentId> {
281 self.actor_to_fragment_id = fragment_actors
282 .iter()
283 .flat_map(|(fragment_id, actors)| {
284 actors.iter().map(|actor_id| (*actor_id, *fragment_id))
285 })
286 .collect();
287
288 for node in self
289 .current_backfill_nodes
290 .values_mut()
291 .chain(self.remaining_backfill_nodes.values_mut())
292 {
293 if let Some(actors) = fragment_actors.get(&node.fragment_id) {
294 node.remaining_actors = actors.iter().copied().collect();
295 } else {
296 node.remaining_actors.clear();
297 }
298 }
299
300 let finished_fragments: Vec<_> = self
301 .current_backfill_nodes
302 .iter()
303 .filter(|(_, node)| node.remaining_actors.is_empty())
304 .map(|(fragment_id, _)| *fragment_id)
305 .collect();
306
307 let mut newly_scheduled = vec![];
308 for fragment_id in finished_fragments {
309 newly_scheduled.extend(self.finish_fragment(fragment_id));
310 }
311 newly_scheduled
312 }
313}