risingwave_meta/barrier/
backfill_order_control.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16
17use risingwave_common::catalog::FragmentTypeFlag;
18
19use crate::model::{FragmentId, StreamJobFragments};
20
21/// This is the "global" `fragment_id`.
22/// The local `fragment_id` is namespaced by the `fragment_id`.
23pub type ActorId = u32;
24
25#[derive(Clone, Debug, Default)]
26pub struct BackfillNode {
27    fragment_id: FragmentId,
28    /// How many more actors need to finish,
29    /// before this fragment can finish backfilling.
30    remaining_actors: HashSet<ActorId>,
31    /// How many more dependencies need to finish,
32    /// before this fragment can be backfilled.
33    remaining_dependencies: HashSet<FragmentId>,
34    children: Vec<FragmentId>,
35}
36
37/// Actor done                   -> update `fragment_id` state
38/// Operator done                -> update downstream operator dependency
39/// Operator's dependencies done -> queue operator for backfill
40#[derive(Clone, Debug, Default)]
41pub struct BackfillOrderState {
42    // The order plan.
43    current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44    // Remaining nodes to finish
45    remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
46    // The mapping between actors and fragment_ids
47    actor_to_fragment_id: HashMap<ActorId, FragmentId>,
48}
49
50/// Get nodes with some dependencies.
51/// These should initially be paused until their dependencies are done.
52pub fn get_nodes_with_backfill_dependencies(
53    backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55    backfill_orders.values().flatten().copied().collect()
56}
57
58// constructor
59impl BackfillOrderState {
60    pub fn new(
61        backfill_orders: HashMap<FragmentId, Vec<FragmentId>>,
62        stream_job_fragments: &StreamJobFragments,
63    ) -> Self {
64        tracing::debug!(?backfill_orders, "initialize backfill order state");
65        let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
66
67        let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
68
69        for fragment in stream_job_fragments.fragments() {
70            if fragment
71                .fragment_type_mask
72                .contains_any([FragmentTypeFlag::StreamScan, FragmentTypeFlag::SourceScan])
73            {
74                let fragment_id = fragment.fragment_id;
75                backfill_nodes.insert(
76                    fragment_id,
77                    BackfillNode {
78                        fragment_id,
79                        remaining_actors: stream_job_fragments
80                            .fragment_actors(fragment_id)
81                            .iter()
82                            .map(|actor| actor.actor_id)
83                            .collect(),
84                        remaining_dependencies: Default::default(),
85                        children: backfill_orders
86                            .get(&fragment_id)
87                            .cloned()
88                            .unwrap_or_else(Vec::new),
89                    },
90                );
91            }
92        }
93
94        for (fragment_id, children) in backfill_orders {
95            for child in &children {
96                let child_node = backfill_nodes.get_mut(child).unwrap();
97                child_node.remaining_dependencies.insert(fragment_id);
98            }
99        }
100
101        let mut current_backfill_nodes = HashMap::new();
102        let mut remaining_backfill_nodes = HashMap::new();
103        for (fragment_id, node) in backfill_nodes {
104            if node.remaining_dependencies.is_empty() {
105                current_backfill_nodes.insert(fragment_id, node);
106            } else {
107                remaining_backfill_nodes.insert(fragment_id, node);
108            }
109        }
110
111        Self {
112            current_backfill_nodes,
113            remaining_backfill_nodes,
114            actor_to_fragment_id,
115        }
116    }
117}
118
119// state transitions
120impl BackfillOrderState {
121    pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
122        let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id) else {
123            tracing::error!(actor_id, "fragment not found for actor");
124            return vec![];
125        };
126        // NOTE(kwannoel):
127        // Backfill order are specified by the user, for instance:
128        // t1->t2 means that t1 must be backfilled before t2.
129        // However, each snapshot executor may finish ahead of time if there's no data to backfill.
130        // For instance, if t2 has no data to backfill,
131        // and t1 has a lot of data to backfill,
132        // t2's scan operator might finish immediately,
133        // and t2 will finish before t1.
134        // In such cases, we should directly update it in remaining backfill nodes instead,
135        // so we should track whether a node finished in order.
136        let (node, is_in_order) = match self.current_backfill_nodes.get_mut(fragment_id) {
137            Some(node) => (node, true),
138            None => {
139                let Some(node) = self.remaining_backfill_nodes.get_mut(fragment_id) else {
140                    tracing::error!(
141                        fragment_id,
142                        actor_id,
143                        "fragment not found in current_backfill_nodes or remaining_backfill_nodes"
144                    );
145                    return vec![];
146                };
147                (node, false)
148            }
149        };
150
151        assert!(node.remaining_actors.remove(&actor_id), "missing actor");
152        tracing::debug!(
153            actor_id,
154            remaining_actors = node.remaining_actors.len(),
155            fragment_id,
156            "finish_backfilling_actor"
157        );
158        if node.remaining_actors.is_empty() && is_in_order {
159            self.finish_fragment(*fragment_id)
160        } else {
161            vec![]
162        }
163    }
164
165    pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
166        let mut newly_scheduled = vec![];
167        // Decrease the remaining_dependency_count of the children.
168        // If the remaining_dependency_count is 0, add the child to the current_backfill_nodes.
169        if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
170            for child_id in &node.children {
171                let newly_scheduled_child_finished = {
172                    let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
173                    assert!(
174                        child.remaining_dependencies.remove(&fragment_id),
175                        "missing dependency"
176                    );
177                    if child.remaining_dependencies.is_empty() {
178                        tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
179                        self.current_backfill_nodes
180                            .insert(child.fragment_id, child.clone());
181                        newly_scheduled.push(child.fragment_id);
182                        child.remaining_actors.is_empty()
183                    } else {
184                        false
185                    }
186                };
187                if newly_scheduled_child_finished {
188                    newly_scheduled.extend(self.finish_fragment(*child_id));
189                }
190            }
191        } else {
192            tracing::error!(fragment_id, "fragment not found in current_backfill_nodes");
193            return vec![];
194        }
195        newly_scheduled
196    }
197}