risingwave_meta/barrier/
backfill_order_control.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::FragmentTypeFlag;
18
19use crate::model::{FragmentId, StreamJobFragments};
20
21pub type ActorId = u32;
24
25#[derive(Clone, Debug, Default)]
26pub struct BackfillNode {
27 fragment_id: FragmentId,
28 remaining_actors: HashSet<ActorId>,
31 remaining_dependencies: HashSet<FragmentId>,
34 children: Vec<FragmentId>,
35}
36
37#[derive(Clone, Debug, Default)]
41pub struct BackfillOrderState {
42 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
46 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
48}
49
50pub fn get_nodes_with_backfill_dependencies(
53 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55 backfill_orders.values().flatten().copied().collect()
56}
57
58impl BackfillOrderState {
60 pub fn new(
61 backfill_orders: HashMap<FragmentId, Vec<FragmentId>>,
62 stream_job_fragments: &StreamJobFragments,
63 ) -> Self {
64 tracing::debug!(?backfill_orders, "initialize backfill order state");
65 let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
66
67 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
68
69 for fragment in stream_job_fragments.fragments() {
70 if fragment.fragment_type_mask.contains_any([
71 FragmentTypeFlag::StreamScan,
72 FragmentTypeFlag::SourceScan,
73 FragmentTypeFlag::LocalityProvider,
74 ]) {
75 let fragment_id = fragment.fragment_id;
76 backfill_nodes.insert(
77 fragment_id,
78 BackfillNode {
79 fragment_id,
80 remaining_actors: stream_job_fragments
81 .fragment_actors(fragment_id)
82 .iter()
83 .map(|actor| actor.actor_id)
84 .collect(),
85 remaining_dependencies: Default::default(),
86 children: backfill_orders
87 .get(&fragment_id)
88 .cloned()
89 .unwrap_or_else(Vec::new),
90 },
91 );
92 }
93 }
94
95 for (fragment_id, children) in backfill_orders {
96 for child in &children {
97 let child_node = backfill_nodes.get_mut(child).unwrap();
98 child_node.remaining_dependencies.insert(fragment_id);
99 }
100 }
101
102 let mut current_backfill_nodes = HashMap::new();
103 let mut remaining_backfill_nodes = HashMap::new();
104 for (fragment_id, node) in backfill_nodes {
105 if node.remaining_dependencies.is_empty() {
106 current_backfill_nodes.insert(fragment_id, node);
107 } else {
108 remaining_backfill_nodes.insert(fragment_id, node);
109 }
110 }
111
112 Self {
113 current_backfill_nodes,
114 remaining_backfill_nodes,
115 actor_to_fragment_id,
116 }
117 }
118}
119
120impl BackfillOrderState {
122 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
123 let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
124 tracing::error!(actor_id, "fragment not found for actor");
125 return vec![];
126 };
127 let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
138 Some(node) => (node, true),
139 None => {
140 let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
141 tracing::error!(
142 fragment_id,
143 actor_id,
144 "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
145 );
146 return vec![];
147 };
148 (node, false)
149 }
150 };
151
152 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
153 tracing::debug!(
154 actor_id,
155 remaining_actors = node.remaining_actors.len(),
156 fragment_id,
157 "finish_backfilling_actor"
158 );
159 if node.remaining_actors.is_empty() && is_in_order {
160 self.finish_fragment(*fragment_id)
161 } else {
162 vec![]
163 }
164 }
165
166 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
167 let mut newly_scheduled = vec![];
168 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
171 for child_id in &node.children {
172 let newly_scheduled_child_finished = {
173 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
174 assert!(
175 child.remaining_dependencies.remove(&fragment_id),
176 "missing dependency"
177 );
178 if child.remaining_dependencies.is_empty() {
179 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
180 self.current_backfill_nodes
181 .insert(child.fragment_id, child.clone());
182 newly_scheduled.push(child.fragment_id);
183 child.remaining_actors.is_empty()
184 } else {
185 false
186 }
187 };
188 if newly_scheduled_child_finished {
189 newly_scheduled.extend(self.finish_fragment(*child_id));
190 }
191 }
192 } else {
193 tracing::error!(fragment_id, "fragment not found in current_backfill_nodes");
194 return vec![];
195 }
196 newly_scheduled
197 }
198}