risingwave_meta/barrier/
backfill_order_control.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::FragmentTypeFlag;
18
19use crate::model::{FragmentId, StreamJobFragments};
20
21pub type ActorId = u32;
24
25#[derive(Clone, Debug, Default)]
26pub struct BackfillNode {
27 fragment_id: FragmentId,
28 remaining_actors: HashSet<ActorId>,
31 remaining_dependencies: HashSet<FragmentId>,
34 children: Vec<FragmentId>,
35}
36
37#[derive(Clone, Debug, Default)]
41pub struct BackfillOrderState {
42 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
46 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
48}
49
50pub fn get_nodes_with_backfill_dependencies(
53 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55 backfill_orders.values().flatten().copied().collect()
56}
57
58impl BackfillOrderState {
60 pub fn new(
61 backfill_orders: HashMap<FragmentId, Vec<FragmentId>>,
62 stream_job_fragments: &StreamJobFragments,
63 ) -> Self {
64 tracing::debug!(?backfill_orders, "initialize backfill order state");
65 let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
66
67 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
68
69 for fragment in stream_job_fragments.fragments() {
70 if fragment
71 .fragment_type_mask
72 .contains_any([FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan])
73 {
74 let fragment_id = fragment.fragment_id;
75 backfill_nodes.insert(
76 fragment_id,
77 BackfillNode {
78 fragment_id,
79 remaining_actors: stream_job_fragments
80 .fragment_actors(fragment_id)
81 .iter()
82 .map(|actor| actor.actor_id)
83 .collect(),
84 remaining_dependencies: Default::default(),
85 children: backfill_orders
86 .get(&fragment_id)
87 .cloned()
88 .unwrap_or_else(Vec::new),
89 },
90 );
91 }
92 }
93
94 for (fragment_id, children) in backfill_orders {
95 for child in &children {
96 let child_node = backfill_nodes.get_mut(child).unwrap();
97 child_node.remaining_dependencies.insert(fragment_id);
98 }
99 }
100
101 let mut current_backfill_nodes = HashMap::new();
102 let mut remaining_backfill_nodes = HashMap::new();
103 for (fragment_id, node) in backfill_nodes {
104 if node.remaining_dependencies.is_empty() {
105 current_backfill_nodes.insert(fragment_id, node);
106 } else {
107 remaining_backfill_nodes.insert(fragment_id, node);
108 }
109 }
110
111 Self {
112 current_backfill_nodes,
113 remaining_backfill_nodes,
114 actor_to_fragment_id,
115 }
116 }
117}
118
119impl BackfillOrderState {
121 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
122 let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
123 tracing::error!(actor_id, "fragment not found for actor");
124 return vec![];
125 };
126 let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
137 Some(node) => (node, true),
138 None => {
139 let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
140 tracing::error!(
141 fragment_id,
142 actor_id,
143 "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
144 );
145 return vec![];
146 };
147 (node, false)
148 }
149 };
150
151 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
152 tracing::debug!(
153 actor_id,
154 remaining_actors = node.remaining_actors.len(),
155 fragment_id,
156 "finish_backfilling_actor"
157 );
158 if node.remaining_actors.is_empty() && is_in_order {
159 self.finish_fragment(*fragment_id)
160 } else {
161 vec![]
162 }
163 }
164
165 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
166 let mut newly_scheduled = vec![];
167 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
170 for child_id in &node.children {
171 let newly_scheduled_child_finished = {
172 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
173 assert!(
174 child.remaining_dependencies.remove(&fragment_id),
175 "missing dependency"
176 );
177 if child.remaining_dependencies.is_empty() {
178 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
179 self.current_backfill_nodes
180 .insert(child.fragment_id, child.clone());
181 newly_scheduled.push(child.fragment_id);
182 child.remaining_actors.is_empty()
183 } else {
184 false
185 }
186 };
187 if newly_scheduled_child_finished {
188 newly_scheduled.extend(self.finish_fragment(*child_id));
189 }
190 }
191 } else {
192 tracing::error!(fragment_id, "fragment not found in current_backfill_nodes");
193 return vec![];
194 }
195 newly_scheduled
196 }
197}