1use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::controller::session_params::SessionParamsControllerRef;
30use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
31use crate::model::FragmentId;
32
33pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
34
35#[derive(Default)]
36pub struct ServingVnodeMapping {
37 serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
38}
39
40impl ServingVnodeMapping {
41 pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
42 self.serving_vnode_mappings.read().clone()
43 }
44
45 pub fn upsert(
48 &self,
49 streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
50 workers: &[WorkerNode],
51 max_serving_parallelism: Option<u64>,
52 ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
53 let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
54 let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
55 let mut failed: Vec<FragmentId> = vec![];
56 for (fragment_id, info) in streaming_parallelisms {
57 let new_mapping = {
58 let old_mapping = serving_vnode_mappings.get(&fragment_id);
59 let max_parallelism = match info.distribution_type {
60 FragmentDistributionType::Unspecified => unreachable!(),
61 FragmentDistributionType::Single => Some(1),
62 FragmentDistributionType::Hash => None,
63 }
64 .or_else(|| max_serving_parallelism.map(|p| p as usize));
65 place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
66 };
67 match new_mapping {
68 None => {
69 serving_vnode_mappings.remove(&fragment_id as _);
70 failed.push(fragment_id);
71 }
72 Some(mapping) => {
73 serving_vnode_mappings.insert(fragment_id, mapping.clone());
74 upserted.insert(fragment_id, mapping);
75 }
76 }
77 }
78 (upserted, failed)
79 }
80
81 fn remove(&self, fragment_ids: &[FragmentId]) {
82 let mut mappings = self.serving_vnode_mappings.write();
83 for fragment_id in fragment_ids {
84 mappings.remove(fragment_id);
85 }
86 }
87}
88
89pub(crate) fn to_fragment_worker_slot_mapping(
90 mappings: &HashMap<FragmentId, WorkerSlotMapping>,
91) -> Vec<FragmentWorkerSlotMapping> {
92 mappings
93 .iter()
94 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
95 fragment_id: *fragment_id,
96 mapping: Some(mapping.to_protobuf()),
97 })
98 .collect()
99}
100
101pub(crate) fn to_deleted_fragment_worker_slot_mapping(
102 fragment_ids: &[FragmentId],
103) -> Vec<FragmentWorkerSlotMapping> {
104 fragment_ids
105 .iter()
106 .map(|fragment_id| FragmentWorkerSlotMapping {
107 fragment_id: *fragment_id,
108 mapping: None,
109 })
110 .collect()
111}
112
113pub async fn on_meta_start(
114 notification_manager: NotificationManagerRef,
115 metadata_manager: &MetadataManager,
116 serving_vnode_mapping: ServingVnodeMappingRef,
117 max_serving_parallelism: Option<u64>,
118) {
119 let (serving_compute_nodes, streaming_parallelisms) =
120 fetch_serving_infos(metadata_manager).await;
121 let (mappings, failed) = serving_vnode_mapping.upsert(
122 streaming_parallelisms,
123 &serving_compute_nodes,
124 max_serving_parallelism,
125 );
126 tracing::debug!(
127 "Initialize serving vnode mapping snapshot for fragments {:?}.",
128 mappings.keys()
129 );
130 if !failed.is_empty() {
131 tracing::warn!(
132 "Fail to update serving vnode mapping for fragments {:?}.",
133 failed
134 );
135 }
136 notification_manager.notify_frontend_without_version(
137 Operation::Snapshot,
138 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
139 mappings: to_fragment_worker_slot_mapping(&mappings),
140 }),
141 );
142}
143
144async fn fetch_serving_infos(
145 metadata_manager: &MetadataManager,
146) -> (
147 Vec<WorkerNode>,
148 HashMap<FragmentId, FragmentParallelismInfo>,
149) {
150 let parallelisms = metadata_manager
152 .catalog_controller
153 .running_fragment_parallelisms(None)
154 .await
155 .expect("fail to fetch running parallelisms");
156 let serving_compute_nodes = metadata_manager
157 .cluster_controller
158 .list_active_serving_workers()
159 .await
160 .expect("fail to list serving compute nodes");
161 (
162 serving_compute_nodes,
163 parallelisms
164 .into_iter()
165 .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
166 .collect(),
167 )
168}
169
170pub async fn start_serving_vnode_mapping_worker(
171 notification_manager: NotificationManagerRef,
172 metadata_manager: MetadataManager,
173 serving_vnode_mapping: ServingVnodeMappingRef,
174 session_params: SessionParamsControllerRef,
175) -> (JoinHandle<()>, Sender<()>) {
176 let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
177 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
178 notification_manager
179 .insert_local_sender(local_notification_tx)
180 .await;
181 let join_handle = tokio::spawn(async move {
182 let reset = || async {
183 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
184 let max_serving_parallelism = session_params
185 .get_params()
186 .await
187 .batch_parallelism()
188 .map(|p| p.get());
189 let (mappings, failed) = serving_vnode_mapping.upsert(
190 streaming_parallelisms,
191 &workers,
192 max_serving_parallelism,
193 );
194 tracing::debug!(
195 "Update serving vnode mapping snapshot for fragments {:?}.",
196 mappings.keys()
197 );
198 if !failed.is_empty() {
199 tracing::warn!(
200 "Fail to update serving vnode mapping for fragments {:?}.",
201 failed
202 );
203 }
204 notification_manager.notify_frontend_without_version(
205 Operation::Snapshot,
206 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
207 mappings: to_fragment_worker_slot_mapping(&mappings),
208 }),
209 );
210 };
211 loop {
212 tokio::select! {
213 notification = local_notification_rx.recv() => {
214 match notification {
215 Some(notification) => {
216 match notification {
217 LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) => {
218 if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
219 continue;
220 }
221 reset().await;
222 }
223 LocalNotification::BatchParallelismChange => {
224 reset().await;
225 }
226 LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
227 if fragment_ids.is_empty() {
228 continue;
229 }
230 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
231 let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
232 match streaming_parallelisms.get(frag_id) {
233 Some(info) => Some((*frag_id, info.clone())),
234 None => {
235 tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
236 None
237 }
238 }
239 }).collect();
240 let max_serving_parallelism = session_params
241 .get_params()
242 .await
243 .batch_parallelism()
244 .map(|p|p.get());
245 let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers, max_serving_parallelism);
246 if !upserted.is_empty() {
247 tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
248 notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
249 }
250 if !failed.is_empty() {
251 tracing::warn!("Fail to update serving vnode mapping for fragments {:?}.", failed);
252 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
253 }
254 }
255 LocalNotification::FragmentMappingsDelete(fragment_ids) => {
256 if fragment_ids.is_empty() {
257 continue;
258 }
259 tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
260 serving_vnode_mapping.remove(&fragment_ids);
261 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
262 }
263 _ => {}
264 }
265 }
266 None => {
267 return;
268 }
269 }
270 }
271 _ = &mut shutdown_rx => {
272 return;
273 }
274 }
275 }
276 });
277 (join_handle, shutdown_tx)
278}