risingwave_meta/barrier/
backfill_order_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::controller::fragment::InflightFragmentInfo;
21use crate::model::{FragmentId, StreamJobFragments};
22use crate::stream::ExtendedFragmentBackfillOrder;
23
24#[derive(Clone, Debug, Default)]
25pub struct BackfillNode {
26    fragment_id: FragmentId,
27    /// How many more actors need to finish,
28    /// before this fragment can finish backfilling.
29    remaining_actors: HashSet<ActorId>,
30    /// How many more dependencies need to finish,
31    /// before this fragment can be backfilled.
32    remaining_dependencies: HashSet<FragmentId>,
33    children: Vec<FragmentId>,
34}
35
36/// Actor done                   -> update `fragment_id` state
37/// Operator done                -> update downstream operator dependency
38/// Operator's dependencies done -> queue operator for backfill
39#[derive(Clone, Debug, Default)]
40pub struct BackfillOrderState {
41    // The order plan.
42    current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
43    // Remaining nodes to finish
44    remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
45    // The mapping between actors and fragment_ids
46    actor_to_fragment_id: HashMap<ActorId, FragmentId>,
47    // The mapping between fragment_ids and table_ids of locality provider fragments
48    locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
49}
50
51/// Get nodes with some dependencies.
52/// These should initially be paused until their dependencies are done.
53pub fn get_nodes_with_backfill_dependencies(
54    backfill_orders: &ExtendedFragmentBackfillOrder,
55) -> HashSet<FragmentId> {
56    backfill_orders.values().flatten().copied().collect()
57}
58
59// constructor
60impl BackfillOrderState {
61    pub fn new(
62        backfill_orders: &ExtendedFragmentBackfillOrder,
63        stream_job_fragments: &StreamJobFragments,
64        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
65    ) -> Self {
66        tracing::debug!(?backfill_orders, "initialize backfill order state");
67        let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
68
69        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
70
71        for fragment in stream_job_fragments.fragments() {
72            if fragment.fragment_type_mask.contains_any([
73                FragmentTypeFlag::StreamScan,
74                FragmentTypeFlag::SourceScan,
75                FragmentTypeFlag::LocalityProvider,
76            ]) {
77                let fragment_id = fragment.fragment_id;
78                backfill_nodes.insert(
79                    fragment_id,
80                    BackfillNode {
81                        fragment_id,
82                        remaining_actors: stream_job_fragments
83                            .fragment_actors(fragment_id)
84                            .iter()
85                            .map(|actor| actor.actor_id)
86                            .collect(),
87                        remaining_dependencies: Default::default(),
88                        children: backfill_orders
89                            .get(&fragment_id)
90                            .cloned()
91                            .unwrap_or_else(Vec::new),
92                    },
93                );
94            }
95        }
96
97        for (fragment_id, children) in backfill_orders.iter() {
98            for child in children {
99                let child_node = backfill_nodes.get_mut(child).unwrap();
100                child_node.remaining_dependencies.insert(*fragment_id);
101            }
102        }
103
104        let mut current_backfill_nodes = HashMap::new();
105        let mut remaining_backfill_nodes = HashMap::new();
106        for (fragment_id, node) in backfill_nodes {
107            if node.remaining_dependencies.is_empty() {
108                current_backfill_nodes.insert(fragment_id, node);
109            } else {
110                remaining_backfill_nodes.insert(fragment_id, node);
111            }
112        }
113
114        Self {
115            current_backfill_nodes,
116            remaining_backfill_nodes,
117            actor_to_fragment_id,
118            locality_fragment_state_table_mapping,
119        }
120    }
121
122    pub fn recover_from_fragment_infos(
123        backfill_orders: &ExtendedFragmentBackfillOrder,
124        fragment_infos: &HashMap<FragmentId, InflightFragmentInfo>,
125        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
126    ) -> Self {
127        tracing::debug!(
128            ?backfill_orders,
129            "initialize backfill order state from recovery"
130        );
131        let actor_to_fragment_id = fragment_infos
132            .iter()
133            .flat_map(|(fragment_id, fragment)| {
134                fragment
135                    .actors
136                    .keys()
137                    .map(|actor_id| (*actor_id, *fragment_id))
138            })
139            .collect();
140
141        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
142
143        for (fragment_id, fragment) in fragment_infos {
144            if fragment.fragment_type_mask.contains_any([
145                FragmentTypeFlag::StreamScan,
146                FragmentTypeFlag::SourceScan,
147                FragmentTypeFlag::LocalityProvider,
148            ]) {
149                backfill_nodes.insert(
150                    *fragment_id,
151                    BackfillNode {
152                        fragment_id: *fragment_id,
153                        remaining_actors: fragment.actors.keys().copied().collect(),
154                        remaining_dependencies: Default::default(),
155                        children: backfill_orders
156                            .get(fragment_id)
157                            .cloned()
158                            .unwrap_or_else(Vec::new),
159                    },
160                );
161            }
162        }
163
164        for (fragment_id, children) in backfill_orders.iter() {
165            for child in children {
166                let child_node = backfill_nodes.get_mut(child).unwrap();
167                child_node.remaining_dependencies.insert(*fragment_id);
168            }
169        }
170
171        let mut current_backfill_nodes = HashMap::new();
172        let mut remaining_backfill_nodes = HashMap::new();
173        for (fragment_id, node) in backfill_nodes {
174            if node.remaining_dependencies.is_empty() {
175                current_backfill_nodes.insert(fragment_id, node);
176            } else {
177                remaining_backfill_nodes.insert(fragment_id, node);
178            }
179        }
180
181        Self {
182            current_backfill_nodes,
183            remaining_backfill_nodes,
184            actor_to_fragment_id,
185            locality_fragment_state_table_mapping,
186        }
187    }
188}
189
190// state transitions
191impl BackfillOrderState {
192    pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
193        let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
194            tracing::error!(%actor_id, "fragment not found for actor");
195            return vec![];
196        };
197        // NOTE(kwannoel):
198        // Backfill order are specified by the user, for instance:
199        // t1->t2 means that t1 must be backfilled before t2.
200        // However, each snapshot executor may finish ahead of time if there's no data to backfill.
201        // For instance, if t2 has no data to backfill,
202        // and t1 has a lot of data to backfill,
203        // t2's scan operator might finish immediately,
204        // and t2 will finish before t1.
205        // In such cases, we should directly update it in remaining backfill nodes instead,
206        // so we should track whether a node finished in order.
207        let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
208            Some(node) => (node, true),
209            None => {
210                let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
211                    tracing::error!(
212                        %fragment_id,
213                        %actor_id,
214                        "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
215                    );
216                    return vec![];
217                };
218                (node, false)
219            }
220        };
221
222        assert!(node.remaining_actors.remove(&actor_id), "missing actor");
223        tracing::debug!(
224            %actor_id,
225            remaining_actors = node.remaining_actors.len(),
226            %fragment_id,
227            "finish_backfilling_actor"
228        );
229        if node.remaining_actors.is_empty() && is_in_order {
230            self.finish_fragment(*fragment_id)
231        } else {
232            vec![]
233        }
234    }
235
236    pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
237        let mut newly_scheduled = vec![];
238        // Decrease the remaining_dependency_count of the children.
239        // If the remaining_dependency_count is 0, add the child to the current_backfill_nodes.
240        if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
241            for child_id in &node.children {
242                let newly_scheduled_child_finished = {
243                    let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
244                    assert!(
245                        child.remaining_dependencies.remove(&fragment_id),
246                        "missing dependency"
247                    );
248                    if child.remaining_dependencies.is_empty() {
249                        let should_schedule = !child.remaining_actors.is_empty();
250                        if should_schedule {
251                            tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
252                        }
253                        self.current_backfill_nodes
254                            .insert(child.fragment_id, child.clone());
255                        if should_schedule {
256                            newly_scheduled.push(child.fragment_id);
257                        }
258                        child.remaining_actors.is_empty()
259                    } else {
260                        false
261                    }
262                };
263                if newly_scheduled_child_finished {
264                    newly_scheduled.extend(self.finish_fragment(*child_id));
265                }
266            }
267        } else {
268            tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
269            return vec![];
270        }
271        newly_scheduled
272    }
273
274    pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
275        self.current_backfill_nodes.keys().copied().collect()
276    }
277
278    pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
279        &self.locality_fragment_state_table_mapping
280    }
281
282    /// Refresh actor mapping after reschedule and return newly scheduled fragments.
283    pub fn refresh_actors(
284        &mut self,
285        fragment_actors: &HashMap<FragmentId, HashSet<ActorId>>,
286    ) -> Vec<FragmentId> {
287        self.actor_to_fragment_id = fragment_actors
288            .iter()
289            .flat_map(|(fragment_id, actors)| {
290                actors.iter().map(|actor_id| (*actor_id, *fragment_id))
291            })
292            .collect();
293
294        for node in self
295            .current_backfill_nodes
296            .values_mut()
297            .chain(self.remaining_backfill_nodes.values_mut())
298        {
299            if let Some(actors) = fragment_actors.get(&node.fragment_id) {
300                node.remaining_actors = actors.iter().copied().collect();
301            } else {
302                node.remaining_actors.clear();
303            }
304        }
305
306        let finished_fragments: Vec<_> = self
307            .current_backfill_nodes
308            .iter()
309            .filter(|(_, node)| node.remaining_actors.is_empty())
310            .map(|(fragment_id, _)| *fragment_id)
311            .collect();
312
313        let mut newly_scheduled = vec![];
314        for fragment_id in finished_fragments {
315            newly_scheduled.extend(self.finish_fragment(fragment_id));
316        }
317        newly_scheduled
318    }
319}