1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::FragmentId;
22use crate::stream::ExtendedFragmentBackfillOrder;
23
24#[derive(Clone, Debug, Default)]
25pub struct BackfillNode {
26 fragment_id: FragmentId,
27 remaining_actors: HashSet<ActorId>,
30 remaining_dependencies: HashSet<FragmentId>,
33 children: Vec<FragmentId>,
34}
35
36#[derive(Clone, Debug, Default)]
40pub struct BackfillOrderState {
41 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
43 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
45 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
47 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
49}
50
51pub fn get_nodes_with_backfill_dependencies(
54 backfill_orders: &ExtendedFragmentBackfillOrder,
55) -> HashSet<FragmentId> {
56 backfill_orders.values().flatten().copied().collect()
57}
58
59impl BackfillOrderState {
61 pub fn new(
62 backfill_orders: &ExtendedFragmentBackfillOrder,
63 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
64 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
65 ) -> Self {
66 tracing::debug!(?backfill_orders, "initialize backfill order state");
67 let actor_to_fragment_id: HashMap<ActorId, FragmentId> = fragment_infos
68 .iter()
69 .flat_map(|(fragment_id, fragment)| {
70 fragment
71 .actors
72 .keys()
73 .map(|actor_id| (*actor_id, *fragment_id))
74 })
75 .collect();
76
77 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
78
79 for fragment in fragment_infos.values() {
80 if fragment.fragment_type_mask.contains_any([
81 FragmentTypeFlag::StreamScan,
82 FragmentTypeFlag::SourceScan,
83 FragmentTypeFlag::LocalityProvider,
84 ]) {
85 let fragment_id = fragment.fragment_id;
86 backfill_nodes.insert(
87 fragment_id,
88 BackfillNode {
89 fragment_id,
90 remaining_actors: fragment.actors.keys().copied().collect(),
91 remaining_dependencies: Default::default(),
92 children: backfill_orders
93 .get(&fragment_id)
94 .cloned()
95 .unwrap_or_else(Vec::new),
96 },
97 );
98 }
99 }
100
101 for (fragment_id, children) in backfill_orders.iter() {
102 for child in children {
103 let child_node = backfill_nodes.get_mut(child).unwrap();
104 child_node.remaining_dependencies.insert(*fragment_id);
105 }
106 }
107
108 let mut current_backfill_nodes = HashMap::new();
109 let mut remaining_backfill_nodes = HashMap::new();
110 for (fragment_id, node) in backfill_nodes {
111 if node.remaining_dependencies.is_empty() {
112 current_backfill_nodes.insert(fragment_id, node);
113 } else {
114 remaining_backfill_nodes.insert(fragment_id, node);
115 }
116 }
117
118 Self {
119 current_backfill_nodes,
120 remaining_backfill_nodes,
121 actor_to_fragment_id,
122 locality_fragment_state_table_mapping,
123 }
124 }
125
126 pub fn recover_from_fragment_infos(
127 backfill_orders: &ExtendedFragmentBackfillOrder,
128 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
129 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
130 ) -> Self {
131 tracing::debug!(
132 ?backfill_orders,
133 "initialize backfill order state from recovery"
134 );
135 let actor_to_fragment_id = fragment_infos
136 .iter()
137 .flat_map(|(fragment_id, fragment)| {
138 fragment
139 .actors
140 .keys()
141 .map(|actor_id| (*actor_id, *fragment_id))
142 })
143 .collect();
144
145 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
146
147 for (fragment_id, fragment) in fragment_infos {
148 if fragment.fragment_type_mask.contains_any([
149 FragmentTypeFlag::StreamScan,
150 FragmentTypeFlag::SourceScan,
151 FragmentTypeFlag::LocalityProvider,
152 ]) {
153 backfill_nodes.insert(
154 *fragment_id,
155 BackfillNode {
156 fragment_id: *fragment_id,
157 remaining_actors: fragment.actors.keys().copied().collect(),
158 remaining_dependencies: Default::default(),
159 children: backfill_orders
160 .get(fragment_id)
161 .cloned()
162 .unwrap_or_else(Vec::new),
163 },
164 );
165 }
166 }
167
168 for (fragment_id, children) in backfill_orders.iter() {
169 for child in children {
170 let child_node = backfill_nodes.get_mut(child).unwrap();
171 child_node.remaining_dependencies.insert(*fragment_id);
172 }
173 }
174
175 let mut current_backfill_nodes = HashMap::new();
176 let mut remaining_backfill_nodes = HashMap::new();
177 for (fragment_id, node) in backfill_nodes {
178 if node.remaining_dependencies.is_empty() {
179 current_backfill_nodes.insert(fragment_id, node);
180 } else {
181 remaining_backfill_nodes.insert(fragment_id, node);
182 }
183 }
184
185 Self {
186 current_backfill_nodes,
187 remaining_backfill_nodes,
188 actor_to_fragment_id,
189 locality_fragment_state_table_mapping,
190 }
191 }
192}
193
194impl BackfillOrderState {
196 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
197 let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
198 tracing::error!(%actor_id, "fragment not found for actor");
199 return vec![];
200 };
201 let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
212 Some(node) => (node, true),
213 None => {
214 let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
215 tracing::error!(
216 %fragment_id,
217 %actor_id,
218 "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
219 );
220 return vec![];
221 };
222 (node, false)
223 }
224 };
225
226 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
227 tracing::debug!(
228 %actor_id,
229 remaining_actors = node.remaining_actors.len(),
230 %fragment_id,
231 "finish_backfilling_actor"
232 );
233 if node.remaining_actors.is_empty() && is_in_order {
234 self.finish_fragment(*fragment_id)
235 } else {
236 vec![]
237 }
238 }
239
240 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
241 let mut newly_scheduled = vec![];
242 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
245 for child_id in &node.children {
246 let newly_scheduled_child_finished = {
247 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
248 assert!(
249 child.remaining_dependencies.remove(&fragment_id),
250 "missing dependency"
251 );
252 if child.remaining_dependencies.is_empty() {
253 let should_schedule = !child.remaining_actors.is_empty();
254 if should_schedule {
255 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
256 }
257 self.current_backfill_nodes
258 .insert(child.fragment_id, child.clone());
259 if should_schedule {
260 newly_scheduled.push(child.fragment_id);
261 }
262 child.remaining_actors.is_empty()
263 } else {
264 false
265 }
266 };
267 if newly_scheduled_child_finished {
268 newly_scheduled.extend(self.finish_fragment(*child_id));
269 }
270 }
271 } else {
272 tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
273 return vec![];
274 }
275 newly_scheduled
276 }
277
278 pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
279 self.current_backfill_nodes.keys().copied().collect()
280 }
281
282 pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
283 &self.locality_fragment_state_table_mapping
284 }
285
286 pub fn refresh_actors(
288 &mut self,
289 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
290 ) -> Vec<FragmentId> {
291 self.actor_to_fragment_id = fragment_actors
292 .iter()
293 .flat_map(|(fragment_id, actors)| {
294 actors.iter().map(|actor_id| (*actor_id, *fragment_id))
295 })
296 .collect();
297
298 for node in self
299 .current_backfill_nodes
300 .values_mut()
301 .chain(self.remaining_backfill_nodes.values_mut())
302 {
303 if let Some(actors) = fragment_actors.get(&node.fragment_id) {
304 node.remaining_actors = actors.iter().copied().collect();
305 } else {
306 node.remaining_actors.clear();
307 }
308 }
309
310 let finished_fragments: Vec<_> = self
311 .current_backfill_nodes
312 .iter()
313 .filter(|(_, node)| node.remaining_actors.is_empty())
314 .map(|(fragment_id, _)| *fragment_id)
315 .collect();
316
317 let mut newly_scheduled = vec![];
318 for fragment_id in finished_fragments {
319 newly_scheduled.extend(self.finish_fragment(fragment_id));
320 }
321 newly_scheduled
322 }
323}