risingwave_meta/barrier/
backfill_order_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::{FragmentId, StreamJobFragments};
22
23#[derive(Clone, Debug, Default)]
24pub struct BackfillNode {
25    fragment_id: FragmentId,
26    /// How many more actors need to finish,
27    /// before this fragment can finish backfilling.
28    remaining_actors: HashSet<ActorId>,
29    /// How many more dependencies need to finish,
30    /// before this fragment can be backfilled.
31    remaining_dependencies: HashSet<FragmentId>,
32    children: Vec<FragmentId>,
33}
34
35/// Actor done                   -> update `fragment_id` state
36/// Operator done                -> update downstream operator dependency
37/// Operator's dependencies done -> queue operator for backfill
38#[derive(Clone, Debug, Default)]
39pub struct BackfillOrderState {
40    // The order plan.
41    current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
42    // Remaining nodes to finish
43    remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44    // The mapping between actors and fragment_ids
45    actor_to_fragment_id: HashMap<ActorId, FragmentId>,
46    // The mapping between fragment_ids and table_ids of locality provider fragments
47    locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
48}
49
50/// Get nodes with some dependencies.
51/// These should initially be paused until their dependencies are done.
52pub fn get_nodes_with_backfill_dependencies(
53    backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55    backfill_orders.values().flatten().copied().collect()
56}
57
58// constructor
59impl BackfillOrderState {
60    pub fn new(
61        backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
62        stream_job_fragments: &StreamJobFragments,
63        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
64    ) -> Self {
65        tracing::debug!(?backfill_orders, "initialize backfill order state");
66        let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
67
68        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
69
70        for fragment in stream_job_fragments.fragments() {
71            if fragment.fragment_type_mask.contains_any([
72                FragmentTypeFlag::StreamScan,
73                FragmentTypeFlag::SourceScan,
74                FragmentTypeFlag::LocalityProvider,
75            ]) {
76                let fragment_id = fragment.fragment_id;
77                backfill_nodes.insert(
78                    fragment_id,
79                    BackfillNode {
80                        fragment_id,
81                        remaining_actors: stream_job_fragments
82                            .fragment_actors(fragment_id)
83                            .iter()
84                            .map(|actor| actor.actor_id)
85                            .collect(),
86                        remaining_dependencies: Default::default(),
87                        children: backfill_orders
88                            .get(&fragment_id)
89                            .cloned()
90                            .unwrap_or_else(Vec::new),
91                    },
92                );
93            }
94        }
95
96        for (fragment_id, children) in backfill_orders {
97            for child in children {
98                let child_node = backfill_nodes.get_mut(child).unwrap();
99                child_node.remaining_dependencies.insert(*fragment_id);
100            }
101        }
102
103        let mut current_backfill_nodes = HashMap::new();
104        let mut remaining_backfill_nodes = HashMap::new();
105        for (fragment_id, node) in backfill_nodes {
106            if node.remaining_dependencies.is_empty() {
107                current_backfill_nodes.insert(fragment_id, node);
108            } else {
109                remaining_backfill_nodes.insert(fragment_id, node);
110            }
111        }
112
113        Self {
114            current_backfill_nodes,
115            remaining_backfill_nodes,
116            actor_to_fragment_id,
117            locality_fragment_state_table_mapping,
118        }
119    }
120
121    pub fn recover_from_fragment_infos(
122        backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
123        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
124        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
125    ) -> Self {
126        tracing::debug!(
127            ?backfill_orders,
128            "initialize backfill order state from recovery"
129        );
130        let actor_to_fragment_id = fragment_infos
131            .iter()
132            .flat_map(|(fragment_id, fragment)| {
133                fragment
134                    .actors
135                    .keys()
136                    .map(|actor_id| (*actor_id, *fragment_id))
137            })
138            .collect();
139
140        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
141
142        for (fragment_id, fragment) in fragment_infos {
143            if fragment.fragment_type_mask.contains_any([
144                FragmentTypeFlag::StreamScan,
145                FragmentTypeFlag::SourceScan,
146                FragmentTypeFlag::LocalityProvider,
147            ]) {
148                backfill_nodes.insert(
149                    *fragment_id,
150                    BackfillNode {
151                        fragment_id: *fragment_id,
152                        remaining_actors: fragment.actors.keys().copied().collect(),
153                        remaining_dependencies: Default::default(),
154                        children: backfill_orders
155                            .get(fragment_id)
156                            .cloned()
157                            .unwrap_or_else(Vec::new),
158                    },
159                );
160            }
161        }
162
163        for (fragment_id, children) in backfill_orders {
164            for child in children {
165                let child_node = backfill_nodes.get_mut(child).unwrap();
166                child_node.remaining_dependencies.insert(*fragment_id);
167            }
168        }
169
170        let mut current_backfill_nodes = HashMap::new();
171        let mut remaining_backfill_nodes = HashMap::new();
172        for (fragment_id, node) in backfill_nodes {
173            if node.remaining_dependencies.is_empty() {
174                current_backfill_nodes.insert(fragment_id, node);
175            } else {
176                remaining_backfill_nodes.insert(fragment_id, node);
177            }
178        }
179
180        Self {
181            current_backfill_nodes,
182            remaining_backfill_nodes,
183            actor_to_fragment_id,
184            locality_fragment_state_table_mapping,
185        }
186    }
187}
188
189// state transitions
190impl BackfillOrderState {
191    pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
192        let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
193            tracing::error!(%actor_id, "fragment not found for actor");
194            return vec![];
195        };
196        // NOTE(kwannoel):
197        // Backfill order are specified by the user, for instance:
198        // t1->t2 means that t1 must be backfilled before t2.
199        // However, each snapshot executor may finish ahead of time if there's no data to backfill.
200        // For instance, if t2 has no data to backfill,
201        // and t1 has a lot of data to backfill,
202        // t2's scan operator might finish immediately,
203        // and t2 will finish before t1.
204        // In such cases, we should directly update it in remaining backfill nodes instead,
205        // so we should track whether a node finished in order.
206        let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
207            Some(node) => (node, true),
208            None => {
209                let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
210                    tracing::error!(
211                        %fragment_id,
212                        %actor_id,
213                        "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
214                    );
215                    return vec![];
216                };
217                (node, false)
218            }
219        };
220
221        assert!(node.remaining_actors.remove(&actor_id), "missing actor");
222        tracing::debug!(
223            %actor_id,
224            remaining_actors = node.remaining_actors.len(),
225            %fragment_id,
226            "finish_backfilling_actor"
227        );
228        if node.remaining_actors.is_empty() && is_in_order {
229            self.finish_fragment(*fragment_id)
230        } else {
231            vec![]
232        }
233    }
234
235    pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
236        let mut newly_scheduled = vec![];
237        // Decrease the remaining_dependency_count of the children.
238        // If the remaining_dependency_count is 0, add the child to the current_backfill_nodes.
239        if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
240            for child_id in &node.children {
241                let newly_scheduled_child_finished = {
242                    let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
243                    assert!(
244                        child.remaining_dependencies.remove(&fragment_id),
245                        "missing dependency"
246                    );
247                    if child.remaining_dependencies.is_empty() {
248                        tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
249                        self.current_backfill_nodes
250                            .insert(child.fragment_id, child.clone());
251                        newly_scheduled.push(child.fragment_id);
252                        child.remaining_actors.is_empty()
253                    } else {
254                        false
255                    }
256                };
257                if newly_scheduled_child_finished {
258                    newly_scheduled.extend(self.finish_fragment(*child_id));
259                }
260            }
261        } else {
262            tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
263            return vec![];
264        }
265        newly_scheduled
266    }
267
268    pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
269        self.current_backfill_nodes.keys().copied().collect()
270    }
271
272    pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
273        &self.locality_fragment_state_table_mapping
274    }
275
276    /// Refresh actor mapping after reschedule and return newly scheduled fragments.
277    pub fn refresh_actors(
278        &mut self,
279        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
280    ) -> Vec<FragmentId> {
281        self.actor_to_fragment_id = fragment_actors
282            .iter()
283            .flat_map(|(fragment_id, actors)| {
284                actors.iter().map(|actor_id| (*actor_id, *fragment_id))
285            })
286            .collect();
287
288        for node in self
289            .current_backfill_nodes
290            .values_mut()
291            .chain(self.remaining_backfill_nodes.values_mut())
292        {
293            if let Some(actors) = fragment_actors.get(&node.fragment_id) {
294                node.remaining_actors = actors.iter().copied().collect();
295            } else {
296                node.remaining_actors.clear();
297            }
298        }
299
300        let finished_fragments: Vec<_> = self
301            .current_backfill_nodes
302            .iter()
303            .filter(|(_, node)| node.remaining_actors.is_empty())
304            .map(|(fragment_id, _)| *fragment_id)
305            .collect();
306
307        let mut newly_scheduled = vec![];
308        for fragment_id in finished_fragments {
309            newly_scheduled.extend(self.finish_fragment(fragment_id));
310        }
311        newly_scheduled
312    }
313}