1use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::controller::session_params::SessionParamsControllerRef;
30use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
31use crate::model::FragmentId;
32
33pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
34
35#[derive(Default)]
36pub struct ServingVnodeMapping {
37 serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
38}
39
40impl ServingVnodeMapping {
41 pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
42 self.serving_vnode_mappings.read().clone()
43 }
44
45 pub fn upsert(
48 &self,
49 streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
50 workers: &[WorkerNode],
51 max_serving_parallelism: Option<u64>,
52 ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
53 let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
54 let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
55 let mut failed: Vec<FragmentId> = vec![];
56 for (fragment_id, info) in streaming_parallelisms {
57 let new_mapping = {
58 let old_mapping = serving_vnode_mappings.get(&fragment_id);
59 let max_parallelism = match info.distribution_type {
60 FragmentDistributionType::Unspecified => unreachable!(),
61 FragmentDistributionType::Single => Some(1),
62 FragmentDistributionType::Hash => None,
63 }
64 .or_else(|| max_serving_parallelism.map(|p| p as usize));
65 place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
66 };
67 match new_mapping {
68 None => {
69 serving_vnode_mappings.remove(&fragment_id as _);
70 failed.push(fragment_id);
71 }
72 Some(mapping) => {
73 serving_vnode_mappings.insert(fragment_id, mapping.clone());
74 upserted.insert(fragment_id, mapping);
75 }
76 }
77 }
78 (upserted, failed)
79 }
80
81 fn remove(&self, fragment_ids: &[FragmentId]) {
82 let mut mappings = self.serving_vnode_mappings.write();
83 for fragment_id in fragment_ids {
84 mappings.remove(fragment_id);
85 }
86 }
87}
88
89pub(crate) fn to_fragment_worker_slot_mapping(
90 mappings: &HashMap<FragmentId, WorkerSlotMapping>,
91) -> Vec<FragmentWorkerSlotMapping> {
92 mappings
93 .iter()
94 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
95 fragment_id: *fragment_id,
96 mapping: Some(mapping.to_protobuf()),
97 })
98 .collect()
99}
100
101pub(crate) fn to_deleted_fragment_worker_slot_mapping(
102 fragment_ids: &[FragmentId],
103) -> Vec<FragmentWorkerSlotMapping> {
104 fragment_ids
105 .iter()
106 .map(|fragment_id| FragmentWorkerSlotMapping {
107 fragment_id: *fragment_id,
108 mapping: None,
109 })
110 .collect()
111}
112
113pub async fn on_meta_start(
114 notification_manager: NotificationManagerRef,
115 metadata_manager: &MetadataManager,
116 serving_vnode_mapping: ServingVnodeMappingRef,
117 max_serving_parallelism: Option<u64>,
118) {
119 let (serving_compute_nodes, streaming_parallelisms) =
120 fetch_serving_infos(metadata_manager).await;
121 let (mappings, failed) = serving_vnode_mapping.upsert(
122 streaming_parallelisms,
123 &serving_compute_nodes,
124 max_serving_parallelism,
125 );
126 tracing::debug!(
127 "Initialize serving vnode mapping snapshot for fragments {:?}.",
128 mappings.keys()
129 );
130 if !failed.is_empty() {
131 tracing::warn!(
132 "Fail to update serving vnode mapping for fragments {:?}.",
133 failed
134 );
135 }
136 notification_manager.notify_frontend_without_version(
137 Operation::Snapshot,
138 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
139 mappings: to_fragment_worker_slot_mapping(&mappings),
140 }),
141 );
142}
143
144async fn fetch_serving_infos(
145 metadata_manager: &MetadataManager,
146) -> (
147 Vec<WorkerNode>,
148 HashMap<FragmentId, FragmentParallelismInfo>,
149) {
150 let parallelisms = metadata_manager
152 .catalog_controller
153 .running_fragment_parallelisms(None)
154 .await
155 .expect("fail to fetch running parallelisms");
156 let serving_compute_nodes = metadata_manager
157 .cluster_controller
158 .list_active_serving_workers()
159 .await
160 .expect("fail to list serving compute nodes");
161 (
162 serving_compute_nodes,
163 parallelisms
164 .into_iter()
165 .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
166 .collect(),
167 )
168}
169
170pub fn start_serving_vnode_mapping_worker(
171 notification_manager: NotificationManagerRef,
172 metadata_manager: MetadataManager,
173 serving_vnode_mapping: ServingVnodeMappingRef,
174 session_params: SessionParamsControllerRef,
175) -> (JoinHandle<()>, Sender<()>) {
176 let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
177 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
178 notification_manager.insert_local_sender(local_notification_tx);
179 let join_handle = tokio::spawn(async move {
180 let reset = || async {
181 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
182 let max_serving_parallelism = session_params
183 .get_params()
184 .await
185 .batch_parallelism()
186 .map(|p| p.get());
187 let (mappings, failed) = serving_vnode_mapping.upsert(
188 streaming_parallelisms,
189 &workers,
190 max_serving_parallelism,
191 );
192 tracing::debug!(
193 "Update serving vnode mapping snapshot for fragments {:?}.",
194 mappings.keys()
195 );
196 if !failed.is_empty() {
197 tracing::warn!(
198 "Fail to update serving vnode mapping for fragments {:?}.",
199 failed
200 );
201 }
202 notification_manager.notify_frontend_without_version(
203 Operation::Snapshot,
204 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
205 mappings: to_fragment_worker_slot_mapping(&mappings),
206 }),
207 );
208 };
209 loop {
210 tokio::select! {
211 notification = local_notification_rx.recv() => {
212 match notification {
213 Some(notification) => {
214 match notification {
215 LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) => {
216 if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
217 continue;
218 }
219 reset().await;
220 }
221 LocalNotification::BatchParallelismChange => {
222 reset().await;
223 }
224 LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
225 if fragment_ids.is_empty() {
226 continue;
227 }
228 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
229 let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
230 match streaming_parallelisms.get(frag_id) {
231 Some(info) => Some((*frag_id, info.clone())),
232 None => {
233 tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
234 None
235 }
236 }
237 }).collect();
238 let max_serving_parallelism = session_params
239 .get_params()
240 .await
241 .batch_parallelism()
242 .map(|p|p.get());
243 let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers, max_serving_parallelism);
244 if !upserted.is_empty() {
245 tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
246 notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
247 }
248 if !failed.is_empty() {
249 tracing::warn!("Fail to update serving vnode mapping for fragments {:?}.", failed);
250 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
251 }
252 }
253 LocalNotification::FragmentMappingsDelete(fragment_ids) => {
254 if fragment_ids.is_empty() {
255 continue;
256 }
257 tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
258 serving_vnode_mapping.remove(&fragment_ids);
259 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
260 }
261 _ => {}
262 }
263 }
264 None => {
265 return;
266 }
267 }
268 }
269 _ = &mut shutdown_rx => {
270 return;
271 }
272 }
273 }
274 });
275 (join_handle, shutdown_tx)
276}