risingwave_meta/barrier/
backfill_order_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::FragmentId;
22use crate::stream::ExtendedFragmentBackfillOrder;
23
24#[derive(Clone, Debug, Default)]
25pub struct BackfillNode {
26    fragment_id: FragmentId,
27    /// How many more actors need to finish,
28    /// before this fragment can finish backfilling.
29    remaining_actors: HashSet<ActorId>,
30    /// How many more dependencies need to finish,
31    /// before this fragment can be backfilled.
32    remaining_dependencies: HashSet<FragmentId>,
33    children: Vec<FragmentId>,
34}
35
36/// Actor done                   -> update `fragment_id` state
37/// Operator done                -> update downstream operator dependency
38/// Operator's dependencies done -> queue operator for backfill
39#[derive(Clone, Debug, Default)]
40pub struct BackfillOrderState {
41    // The order plan.
42    current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
43    // Remaining nodes to finish
44    remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
45    // The mapping between actors and fragment_ids
46    actor_to_fragment_id: HashMap<ActorId, FragmentId>,
47    // The mapping between fragment_ids and table_ids of locality provider fragments
48    locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
49}
50
51/// Get nodes with some dependencies.
52/// These should initially be paused until their dependencies are done.
53pub fn get_nodes_with_backfill_dependencies(
54    backfill_orders: &ExtendedFragmentBackfillOrder,
55) -> HashSet<FragmentId> {
56    backfill_orders.values().flatten().copied().collect()
57}
58
59// constructor
60impl BackfillOrderState {
61    pub fn new(
62        backfill_orders: &ExtendedFragmentBackfillOrder,
63        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
64        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
65    ) -> Self {
66        tracing::debug!(?backfill_orders, "initialize backfill order state");
67        let actor_to_fragment_id: HashMap<ActorId, FragmentId> = fragment_infos
68            .iter()
69            .flat_map(|(fragment_id, fragment)| {
70                fragment
71                    .actors
72                    .keys()
73                    .map(|actor_id| (*actor_id, *fragment_id))
74            })
75            .collect();
76
77        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
78
79        for fragment in fragment_infos.values() {
80            if fragment.fragment_type_mask.contains_any([
81                FragmentTypeFlag::StreamScan,
82                FragmentTypeFlag::SourceScan,
83                FragmentTypeFlag::LocalityProvider,
84            ]) {
85                let fragment_id = fragment.fragment_id;
86                backfill_nodes.insert(
87                    fragment_id,
88                    BackfillNode {
89                        fragment_id,
90                        remaining_actors: fragment.actors.keys().copied().collect(),
91                        remaining_dependencies: Default::default(),
92                        children: backfill_orders
93                            .get(&fragment_id)
94                            .cloned()
95                            .unwrap_or_else(Vec::new),
96                    },
97                );
98            }
99        }
100
101        for (fragment_id, children) in backfill_orders.iter() {
102            for child in children {
103                let child_node = backfill_nodes.get_mut(child).unwrap();
104                child_node.remaining_dependencies.insert(*fragment_id);
105            }
106        }
107
108        let mut current_backfill_nodes = HashMap::new();
109        let mut remaining_backfill_nodes = HashMap::new();
110        for (fragment_id, node) in backfill_nodes {
111            if node.remaining_dependencies.is_empty() {
112                current_backfill_nodes.insert(fragment_id, node);
113            } else {
114                remaining_backfill_nodes.insert(fragment_id, node);
115            }
116        }
117
118        Self {
119            current_backfill_nodes,
120            remaining_backfill_nodes,
121            actor_to_fragment_id,
122            locality_fragment_state_table_mapping,
123        }
124    }
125
126    pub fn recover_from_fragment_infos(
127        backfill_orders: &ExtendedFragmentBackfillOrder,
128        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
129        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
130    ) -> Self {
131        tracing::debug!(
132            ?backfill_orders,
133            "initialize backfill order state from recovery"
134        );
135        let actor_to_fragment_id = fragment_infos
136            .iter()
137            .flat_map(|(fragment_id, fragment)| {
138                fragment
139                    .actors
140                    .keys()
141                    .map(|actor_id| (*actor_id, *fragment_id))
142            })
143            .collect();
144
145        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
146
147        for (fragment_id, fragment) in fragment_infos {
148            if fragment.fragment_type_mask.contains_any([
149                FragmentTypeFlag::StreamScan,
150                FragmentTypeFlag::SourceScan,
151                FragmentTypeFlag::LocalityProvider,
152            ]) {
153                backfill_nodes.insert(
154                    *fragment_id,
155                    BackfillNode {
156                        fragment_id: *fragment_id,
157                        remaining_actors: fragment.actors.keys().copied().collect(),
158                        remaining_dependencies: Default::default(),
159                        children: backfill_orders
160                            .get(fragment_id)
161                            .cloned()
162                            .unwrap_or_else(Vec::new),
163                    },
164                );
165            }
166        }
167
168        for (fragment_id, children) in backfill_orders.iter() {
169            for child in children {
170                let child_node = backfill_nodes.get_mut(child).unwrap();
171                child_node.remaining_dependencies.insert(*fragment_id);
172            }
173        }
174
175        let mut current_backfill_nodes = HashMap::new();
176        let mut remaining_backfill_nodes = HashMap::new();
177        for (fragment_id, node) in backfill_nodes {
178            if node.remaining_dependencies.is_empty() {
179                current_backfill_nodes.insert(fragment_id, node);
180            } else {
181                remaining_backfill_nodes.insert(fragment_id, node);
182            }
183        }
184
185        Self {
186            current_backfill_nodes,
187            remaining_backfill_nodes,
188            actor_to_fragment_id,
189            locality_fragment_state_table_mapping,
190        }
191    }
192}
193
194// state transitions
195impl BackfillOrderState {
196    pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
197        let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
198            tracing::error!(%actor_id, "fragment not found for actor");
199            return vec![];
200        };
201        // NOTE(kwannoel):
202        // Backfill order are specified by the user, for instance:
203        // t1->t2 means that t1 must be backfilled before t2.
204        // However, each snapshot executor may finish ahead of time if there's no data to backfill.
205        // For instance, if t2 has no data to backfill,
206        // and t1 has a lot of data to backfill,
207        // t2's scan operator might finish immediately,
208        // and t2 will finish before t1.
209        // In such cases, we should directly update it in remaining backfill nodes instead,
210        // so we should track whether a node finished in order.
211        let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
212            Some(node) => (node, true),
213            None => {
214                let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
215                    tracing::error!(
216                        %fragment_id,
217                        %actor_id,
218                        "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
219                    );
220                    return vec![];
221                };
222                (node, false)
223            }
224        };
225
226        assert!(node.remaining_actors.remove(&actor_id), "missing actor");
227        tracing::debug!(
228            %actor_id,
229            remaining_actors = node.remaining_actors.len(),
230            %fragment_id,
231            "finish_backfilling_actor"
232        );
233        if node.remaining_actors.is_empty() && is_in_order {
234            self.finish_fragment(*fragment_id)
235        } else {
236            vec![]
237        }
238    }
239
240    pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
241        let mut newly_scheduled = vec![];
242        // Decrease the remaining_dependency_count of the children.
243        // If the remaining_dependency_count is 0, add the child to the current_backfill_nodes.
244        if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
245            for child_id in &node.children {
246                let newly_scheduled_child_finished = {
247                    let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
248                    assert!(
249                        child.remaining_dependencies.remove(&fragment_id),
250                        "missing dependency"
251                    );
252                    if child.remaining_dependencies.is_empty() {
253                        let should_schedule = !child.remaining_actors.is_empty();
254                        if should_schedule {
255                            tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
256                        }
257                        self.current_backfill_nodes
258                            .insert(child.fragment_id, child.clone());
259                        if should_schedule {
260                            newly_scheduled.push(child.fragment_id);
261                        }
262                        child.remaining_actors.is_empty()
263                    } else {
264                        false
265                    }
266                };
267                if newly_scheduled_child_finished {
268                    newly_scheduled.extend(self.finish_fragment(*child_id));
269                }
270            }
271        } else {
272            tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
273            return vec![];
274        }
275        newly_scheduled
276    }
277
278    pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
279        self.current_backfill_nodes.keys().copied().collect()
280    }
281
282    pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
283        &self.locality_fragment_state_table_mapping
284    }
285
286    /// Refresh actor mapping after reschedule and return newly scheduled fragments.
287    pub fn refresh_actors(
288        &mut self,
289        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
290    ) -> Vec<FragmentId> {
291        self.actor_to_fragment_id = fragment_actors
292            .iter()
293            .flat_map(|(fragment_id, actors)| {
294                actors.iter().map(|actor_id| (*actor_id, *fragment_id))
295            })
296            .collect();
297
298        for node in self
299            .current_backfill_nodes
300            .values_mut()
301            .chain(self.remaining_backfill_nodes.values_mut())
302        {
303            if let Some(actors) = fragment_actors.get(&node.fragment_id) {
304                node.remaining_actors = actors.iter().copied().collect();
305            } else {
306                node.remaining_actors.clear();
307            }
308        }
309
310        let finished_fragments: Vec<_> = self
311            .current_backfill_nodes
312            .iter()
313            .filter(|(_, node)| node.remaining_actors.is_empty())
314            .map(|(fragment_id, _)| *fragment_id)
315            .collect();
316
317        let mut newly_scheduled = vec![];
318        for fragment_id in finished_fragments {
319            newly_scheduled.extend(self.finish_fragment(fragment_id));
320        }
321        newly_scheduled
322    }
323}