risingwave_meta/barrier/
backfill_order_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::{FragmentTypeFlag, TableId};
18pub use risingwave_common::id::ActorId;
19
20use crate::model::{FragmentId, StreamJobFragments};
21
22#[derive(Clone, Debug, Default)]
23pub struct BackfillNode {
24    fragment_id: FragmentId,
25    /// How many more actors need to finish,
26    /// before this fragment can finish backfilling.
27    remaining_actors: HashSet<ActorId>,
28    /// How many more dependencies need to finish,
29    /// before this fragment can be backfilled.
30    remaining_dependencies: HashSet<FragmentId>,
31    children: Vec<FragmentId>,
32}
33
34/// Actor done                   -> update `fragment_id` state
35/// Operator done                -> update downstream operator dependency
36/// Operator's dependencies done -> queue operator for backfill
37#[derive(Clone, Debug, Default)]
38pub struct BackfillOrderState {
39    // The order plan.
40    current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
41    // Remaining nodes to finish
42    remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
43    // The mapping between actors and fragment_ids
44    actor_to_fragment_id: HashMap<ActorId, FragmentId>,
45    // The mapping between fragment_ids and table_ids of locality provider fragments
46    locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
47}
48
49/// Get nodes with some dependencies.
50/// These should initially be paused until their dependencies are done.
51pub fn get_nodes_with_backfill_dependencies(
52    backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
53) -> HashSet<FragmentId> {
54    backfill_orders.values().flatten().copied().collect()
55}
56
57// constructor
58impl BackfillOrderState {
59    pub fn new(
60        backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
61        stream_job_fragments: &StreamJobFragments,
62        locality_fragment_state_table_mapping: HashMap<FragmentId, Vec<TableId>>,
63    ) -> Self {
64        tracing::debug!(?backfill_orders, "initialize backfill order state");
65        let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
66
67        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
68
69        for fragment in stream_job_fragments.fragments() {
70            if fragment.fragment_type_mask.contains_any([
71                FragmentTypeFlag::StreamScan,
72                FragmentTypeFlag::SourceScan,
73                FragmentTypeFlag::LocalityProvider,
74            ]) {
75                let fragment_id = fragment.fragment_id;
76                backfill_nodes.insert(
77                    fragment_id,
78                    BackfillNode {
79                        fragment_id,
80                        remaining_actors: stream_job_fragments
81                            .fragment_actors(fragment_id)
82                            .iter()
83                            .map(|actor| actor.actor_id)
84                            .collect(),
85                        remaining_dependencies: Default::default(),
86                        children: backfill_orders
87                            .get(&fragment_id)
88                            .cloned()
89                            .unwrap_or_else(Vec::new),
90                    },
91                );
92            }
93        }
94
95        for (fragment_id, children) in backfill_orders {
96            for child in children {
97                let child_node = backfill_nodes.get_mut(child).unwrap();
98                child_node.remaining_dependencies.insert(*fragment_id);
99            }
100        }
101
102        let mut current_backfill_nodes = HashMap::new();
103        let mut remaining_backfill_nodes = HashMap::new();
104        for (fragment_id, node) in backfill_nodes {
105            if node.remaining_dependencies.is_empty() {
106                current_backfill_nodes.insert(fragment_id, node);
107            } else {
108                remaining_backfill_nodes.insert(fragment_id, node);
109            }
110        }
111
112        Self {
113            current_backfill_nodes,
114            remaining_backfill_nodes,
115            actor_to_fragment_id,
116            locality_fragment_state_table_mapping,
117        }
118    }
119}
120
121// state transitions
122impl BackfillOrderState {
123    pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
124        let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
125            tracing::error!(%actor_id, "fragment not found for actor");
126            return vec![];
127        };
128        // NOTE(kwannoel):
129        // Backfill order are specified by the user, for instance:
130        // t1->t2 means that t1 must be backfilled before t2.
131        // However, each snapshot executor may finish ahead of time if there's no data to backfill.
132        // For instance, if t2 has no data to backfill,
133        // and t1 has a lot of data to backfill,
134        // t2's scan operator might finish immediately,
135        // and t2 will finish before t1.
136        // In such cases, we should directly update it in remaining backfill nodes instead,
137        // so we should track whether a node finished in order.
138        let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
139            Some(node) => (node, true),
140            None => {
141                let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
142                    tracing::error!(
143                        %fragment_id,
144                        %actor_id,
145                        "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
146                    );
147                    return vec![];
148                };
149                (node, false)
150            }
151        };
152
153        assert!(node.remaining_actors.remove(&actor_id), "missing actor");
154        tracing::debug!(
155            %actor_id,
156            remaining_actors = node.remaining_actors.len(),
157            %fragment_id,
158            "finish_backfilling_actor"
159        );
160        if node.remaining_actors.is_empty() && is_in_order {
161            self.finish_fragment(*fragment_id)
162        } else {
163            vec![]
164        }
165    }
166
167    pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
168        let mut newly_scheduled = vec![];
169        // Decrease the remaining_dependency_count of the children.
170        // If the remaining_dependency_count is 0, add the child to the current_backfill_nodes.
171        if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
172            for child_id in &node.children {
173                let newly_scheduled_child_finished = {
174                    let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
175                    assert!(
176                        child.remaining_dependencies.remove(&fragment_id),
177                        "missing dependency"
178                    );
179                    if child.remaining_dependencies.is_empty() {
180                        tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
181                        self.current_backfill_nodes
182                            .insert(child.fragment_id, child.clone());
183                        newly_scheduled.push(child.fragment_id);
184                        child.remaining_actors.is_empty()
185                    } else {
186                        false
187                    }
188                };
189                if newly_scheduled_child_finished {
190                    newly_scheduled.extend(self.finish_fragment(*child_id));
191                }
192            }
193        } else {
194            tracing::error!(%fragment_id, "fragment not found in current_backfill_nodes");
195            return vec![];
196        }
197        newly_scheduled
198    }
199
200    pub fn current_backfill_node_fragment_ids(&self) -> Vec<FragmentId> {
201        self.current_backfill_nodes.keys().copied().collect()
202    }
203
204    pub fn get_locality_fragment_state_table_mapping(&self) -> &HashMap<FragmentId, Vec<TableId>> {
205        &self.locality_fragment_state_table_mapping
206    }
207}