risingwave_meta/barrier/
backfill_order_control.rs1use std::collections::{HashMap, HashSet};
16
17use risingwave_pb::stream_plan::FragmentTypeFlag;
18
19use crate::model::{FragmentId, StreamJobFragments};
20
21pub type ActorId = u32;
24
25#[derive(Clone, Debug, Default)]
26pub struct BackfillNode {
27 fragment_id: FragmentId,
28 remaining_actors: HashSet<ActorId>,
31 remaining_dependencies: HashSet<FragmentId>,
34 children: Vec<FragmentId>,
35}
36
37#[derive(Clone, Debug, Default)]
41pub struct BackfillOrderState {
42 current_backfill_nodes: HashMap<FragmentId, BackfillNode>,
44 remaining_backfill_nodes: HashMap<FragmentId, BackfillNode>,
46 actor_to_fragment_id: HashMap<ActorId, FragmentId>,
48}
49
50pub fn get_nodes_with_backfill_dependencies(
53 backfill_orders: &HashMap<FragmentId, Vec<FragmentId>>,
54) -> HashSet<FragmentId> {
55 backfill_orders.values().flatten().copied().collect()
56}
57
58impl BackfillOrderState {
60 pub fn new(
61 backfill_orders: HashMap<FragmentId, Vec<FragmentId>>,
62 stream_job_fragments: &StreamJobFragments,
63 ) -> Self {
64 tracing::debug!(?backfill_orders, "initialize backfill order state");
65 let actor_to_fragment_id = stream_job_fragments.actor_fragment_mapping();
66
67 let mut backfill_nodes: HashMap<FragmentId, BackfillNode> = HashMap::new();
68
69 for fragment in stream_job_fragments.fragments() {
70 if fragment.fragment_type_mask & (FragmentTypeFlag::StreamScan as u32) > 0 {
71 let fragment_id = fragment.fragment_id;
72 backfill_nodes.insert(
73 fragment_id,
74 BackfillNode {
75 fragment_id,
76 remaining_actors: stream_job_fragments
77 .fragment_actors(fragment_id)
78 .iter()
79 .map(|actor| actor.actor_id)
80 .collect(),
81 remaining_dependencies: Default::default(),
82 children: backfill_orders
83 .get(&fragment_id)
84 .cloned()
85 .unwrap_or_else(Vec::new),
86 },
87 );
88 }
89 }
90
91 for (fragment_id, children) in backfill_orders {
92 for child in &children {
93 let child_node = backfill_nodes.get_mut(child).unwrap();
94 child_node.remaining_dependencies.insert(fragment_id);
95 }
96 }
97
98 let mut current_backfill_nodes = HashMap::new();
99 let mut remaining_backfill_nodes = HashMap::new();
100 for (fragment_id, node) in backfill_nodes {
101 if node.remaining_dependencies.is_empty() {
102 current_backfill_nodes.insert(fragment_id, node);
103 } else {
104 remaining_backfill_nodes.insert(fragment_id, node);
105 }
106 }
107
108 Self {
109 current_backfill_nodes,
110 remaining_backfill_nodes,
111 actor_to_fragment_id,
112 }
113 }
114}
115
116impl BackfillOrderState {
118 pub fn finish_actor(&mut self, actor_id: ActorId) -> Vec<FragmentId> {
119 if let Some(fragment_id) = self.actor_to_fragment_id.get(&actor_id)
121 && let Some(node) = self.current_backfill_nodes.get_mut(fragment_id)
124 {
125 assert!(node.remaining_actors.remove(&actor_id), "missing actor");
126 tracing::debug!(
127 actor_id,
128 remaining_actors = node.remaining_actors.len(),
129 fragment_id,
130 "finish_backfilling_actor"
131 );
132 if node.remaining_actors.is_empty() {
133 return self.finish_fragment(*fragment_id);
134 }
135 }
136 vec![]
137 }
138
139 pub fn finish_fragment(&mut self, fragment_id: FragmentId) -> Vec<FragmentId> {
140 let mut newly_scheduled = vec![];
141 if let Some(node) = self.current_backfill_nodes.remove(&fragment_id) {
144 for child_id in &node.children {
145 let child = self.remaining_backfill_nodes.get_mut(child_id).unwrap();
146 assert!(
147 child.remaining_dependencies.remove(&fragment_id),
148 "missing dependency"
149 );
150 if child.remaining_dependencies.is_empty() {
151 tracing::debug!(fragment_id = ?child_id, "schedule next backfill node");
152 self.current_backfill_nodes
153 .insert(child.fragment_id, child.clone());
154 newly_scheduled.push(child.fragment_id)
155 }
156 }
157 }
158 newly_scheduled
159 }
160}