1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::{FragmentId, StreamJobFragments};
22use crate::stream::ExtendedFragmentBackfillOrder;
23
24#[derive(Clone, Debug, Default)]
25pub struct BackfillNode {
26 fragment_id: FragmentId,
27 remaining_actors: HashSet<ActorId>,
30 remaining_dependencies: HashSet<FragmentId>,
33 children: Vec<FragmentId>,
34}
35
36#[derive(Clone, Debug, Default)]
40pub struct BackfillOrderState {
41 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
43 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
45 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
47 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
49}
50
51pub fn get_nodes_with_backfill_dependencies(
54 backfill_orders: &ExtendedFragmentBackfillOrder,
55) -> HashSet<FragmentId> {
56 backfill_orders.values().flatten().copied().collect()
57}
58
59impl BackfillOrderState {
61 pub fn new(
62 backfill_orders: &ExtendedFragmentBackfillOrder,
63 stream_job_fragments: &StreamJobFragments,
64 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
65 ) -> Self {
66 tracing::debug!(?backfill_orders, "initialize backfill order state");
67 let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
68
69 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
70
71 for fragment in stream_job_fragments.fragments() {
72 if fragment.fragment_type_mask.contains_any([
73 FragmentTypeFlag::StreamScan,
74 FragmentTypeFlag::SourceScan,
75 FragmentTypeFlag::LocalityProvider,
76 ]) {
77 let fragment_id = fragment.fragment_id;
78 backfill_nodes.insert(
79 fragment_id,
80 BackfillNode {
81 fragment_id,
82 remaining_actors: stream_job_fragments
83 .fragment_actors(fragment_id)
84 .iter()
85 .map(|actor| actor.actor_id)
86 .collect(),
87 remaining_dependencies: Default::default(),
88 children: backfill_orders
89 .get(&fragment_id)
90 .cloned()
91 .unwrap_or_else(Vec::new),
92 },
93 );
94 }
95 }
96
97 for (fragment_id, children) in backfill_orders.iter() {
98 for child in children {
99 let child_node = backfill_nodes.get_mut(child).unwrap();
100 child_node.remaining_dependencies.insert(*fragment_id);
101 }
102 }
103
104 let mut current_backfill_nodes = HashMap::new();
105 let mut remaining_backfill_nodes = HashMap::new();
106 for (fragment_id, node) in backfill_nodes {
107 if node.remaining_dependencies.is_empty() {
108 current_backfill_nodes.insert(fragment_id, node);
109 } else {
110 remaining_backfill_nodes.insert(fragment_id, node);
111 }
112 }
113
114 Self {
115 current_backfill_nodes,
116 remaining_backfill_nodes,
117 actor_to_fragment_id,
118 locality_fragment_state_table_mapping,
119 }
120 }
121
122 pub fn recover_from_fragment_infos(
123 backfill_orders: &ExtendedFragmentBackfillOrder,
124 fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
125 locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
126 ) -> Self {
127 tracing::debug!(
128 ?backfill_orders,
129 "initialize backfill order state from recovery"
130 );
131 let actor_to_fragment_id = fragment_infos
132 .iter()
133 .flat_map(|(fragment_id, fragment)| {
134 fragment
135 .actors
136 .keys()
137 .map(|actor_id| (*actor_id, *fragment_id))
138 })
139 .collect();
140
141 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
142
143 for (fragment_id, fragment) in fragment_infos {
144 if fragment.fragment_type_mask.contains_any([
145 FragmentTypeFlag::StreamScan,
146 FragmentTypeFlag::SourceScan,
147 FragmentTypeFlag::LocalityProvider,
148 ]) {
149 backfill_nodes.insert(
150 *fragment_id,
151 BackfillNode {
152 fragment_id: *fragment_id,
153 remaining_actors: fragment.actors.keys().copied().collect(),
154 remaining_dependencies: Default::default(),
155 children: backfill_orders
156 .get(fragment_id)
157 .cloned()
158 .unwrap_or_else(Vec::new),
159 },
160 );
161 }
162 }
163
164 for (fragment_id, children) in backfill_orders.iter() {
165 for child in children {
166 let child_node = backfill_nodes.get_mut(child).unwrap();
167 child_node.remaining_dependencies.insert(*fragment_id);
168 }
169 }
170
171 let mut current_backfill_nodes = HashMap::new();
172 let mut remaining_backfill_nodes = HashMap::new();
173 for (fragment_id, node) in backfill_nodes {
174 if node.remaining_dependencies.is_empty() {
175 current_backfill_nodes.insert(fragment_id, node);
176 } else {
177 remaining_backfill_nodes.insert(fragment_id, node);
178 }
179 }
180
181 Self {
182 current_backfill_nodes,
183 remaining_backfill_nodes,
184 actor_to_fragment_id,
185 locality_fragment_state_table_mapping,
186 }
187 }
188}
189
190impl BackfillOrderState {
192 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
193 let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
194 tracing::error!(%actor_id, "fragment not found for actor");
195 return vec![];
196 };
197 let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
208 Some(node) => (node, true),
209 None => {
210 let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
211 tracing::error!(
212 %fragment_id,
213 %actor_id,
214 "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
215 );
216 return vec![];
217 };
218 (node, false)
219 }
220 };
221
222 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
223 tracing::debug!(
224 %actor_id,
225 remaining_actors = node.remaining_actors.len(),
226 %fragment_id,
227 "finish_backfilling_actor"
228 );
229 if node.remaining_actors.is_empty() && is_in_order {
230 self.finish_fragment(*fragment_id)
231 } else {
232 vec![]
233 }
234 }
235
236 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
237 let mut newly_scheduled = vec![];
238 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
241 for child_id in &node.children {
242 let newly_scheduled_child_finished = {
243 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
244 assert!(
245 child.remaining_dependencies.remove(&fragment_id),
246 "missing dependency"
247 );
248 if child.remaining_dependencies.is_empty() {
249 let should_schedule = !child.remaining_actors.is_empty();
250 if should_schedule {
251 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
252 }
253 self.current_backfill_nodes
254 .insert(child.fragment_id, child.clone());
255 if should_schedule {
256 newly_scheduled.push(child.fragment_id);
257 }
258 child.remaining_actors.is_empty()
259 } else {
260 false
261 }
262 };
263 if newly_scheduled_child_finished {
264 newly_scheduled.extend(self.finish_fragment(*child_id));
265 }
266 }
267 } else {
268 tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
269 return vec![];
270 }
271 newly_scheduled
272 }
273
274 pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
275 self.current_backfill_nodes.keys().copied().collect()
276 }
277
278 pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
279 &self.locality_fragment_state_table_mapping
280 }
281
282 pub fn refresh_actors(
284 &mut self,
285 fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
286 ) -> Vec<FragmentId> {
287 self.actor_to_fragment_id = fragment_actors
288 .iter()
289 .flat_map(|(fragment_id, actors)| {
290 actors.iter().map(|actor_id| (*actor_id, *fragment_id))
291 })
292 .collect();
293
294 for node in self
295 .current_backfill_nodes
296 .values_mut()
297 .chain(self.remaining_backfill_nodes.values_mut())
298 {
299 if let Some(actors) = fragment_actors.get(&node.fragment_id) {
300 node.remaining_actors = actors.iter().copied().collect();
301 } else {
302 node.remaining_actors.clear();
303 }
304 }
305
306 let finished_fragments: Vec<_> = self
307 .current_backfill_nodes
308 .iter()
309 .filter(|(_, node)| node.remaining_actors.is_empty())
310 .map(|(fragment_id, _)| *fragment_id)
311 .collect();
312
313 let mut newly_scheduled = vec![];
314 for fragment_id in finished_fragments {
315 newly_scheduled.extend(self.finish_fragment(fragment_id));
316 }
317 newly_scheduled
318 }
319}