risingwave_meta/serving/
mod.rs1use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
30use crate::model::FragmentId;
31
32pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
33
34#[derive(Default)]
35pub struct ServingVnodeMapping {
36 serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
37}
38
39impl ServingVnodeMapping {
40 pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
41 self.serving_vnode_mappings.read().clone()
42 }
43
44 pub fn upsert(
47 &self,
48 streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
49 workers: &[WorkerNode],
50 ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
51 let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
52 let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
53 let mut failed: Vec<FragmentId> = vec![];
54 for (fragment_id, info) in streaming_parallelisms {
55 let new_mapping = {
56 let old_mapping = serving_vnode_mappings.get(&fragment_id);
57 let max_parallelism = match info.distribution_type {
58 FragmentDistributionType::Unspecified => unreachable!(),
59 FragmentDistributionType::Single => Some(1),
60 FragmentDistributionType::Hash => None,
61 };
62 place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
63 };
64 match new_mapping {
65 None => {
66 serving_vnode_mappings.remove(&fragment_id as _);
67 failed.push(fragment_id);
68 }
69 Some(mapping) => {
70 serving_vnode_mappings.insert(fragment_id, mapping.clone());
71 upserted.insert(fragment_id, mapping);
72 }
73 }
74 }
75 (upserted, failed)
76 }
77
78 fn remove(&self, fragment_ids: &[FragmentId]) {
79 let mut mappings = self.serving_vnode_mappings.write();
80 for fragment_id in fragment_ids {
81 mappings.remove(fragment_id);
82 }
83 }
84}
85
86pub(crate) fn to_fragment_worker_slot_mapping(
87 mappings: &HashMap<FragmentId, WorkerSlotMapping>,
88) -> Vec<FragmentWorkerSlotMapping> {
89 mappings
90 .iter()
91 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
92 fragment_id: *fragment_id,
93 mapping: Some(mapping.to_protobuf()),
94 })
95 .collect()
96}
97
98pub(crate) fn to_deleted_fragment_worker_slot_mapping(
99 fragment_ids: &[FragmentId],
100) -> Vec<FragmentWorkerSlotMapping> {
101 fragment_ids
102 .iter()
103 .map(|fragment_id| FragmentWorkerSlotMapping {
104 fragment_id: *fragment_id,
105 mapping: None,
106 })
107 .collect()
108}
109
110pub async fn on_meta_start(
111 notification_manager: NotificationManagerRef,
112 metadata_manager: &MetadataManager,
113 serving_vnode_mapping: ServingVnodeMappingRef,
114) {
115 let (serving_compute_nodes, streaming_parallelisms) =
116 fetch_serving_infos(metadata_manager).await;
117 let (mappings, _) =
118 serving_vnode_mapping.upsert(streaming_parallelisms, &serving_compute_nodes);
119 tracing::debug!(
120 "Initialize serving vnode mapping snapshot for fragments {:?}.",
121 mappings.keys()
122 );
123 notification_manager.notify_frontend_without_version(
124 Operation::Snapshot,
125 Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
126 mappings: to_fragment_worker_slot_mapping(&mappings),
127 }),
128 );
129}
130
131async fn fetch_serving_infos(
132 metadata_manager: &MetadataManager,
133) -> (
134 Vec<WorkerNode>,
135 HashMap<FragmentId, FragmentParallelismInfo>,
136) {
137 let parallelisms = metadata_manager
139 .catalog_controller
140 .running_fragment_parallelisms(None)
141 .await
142 .expect("fail to fetch running parallelisms");
143 let serving_compute_nodes = metadata_manager
144 .cluster_controller
145 .list_active_serving_workers()
146 .await
147 .expect("fail to list serving compute nodes");
148 (
149 serving_compute_nodes,
150 parallelisms
151 .into_iter()
152 .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
153 .collect(),
154 )
155}
156
157pub async fn start_serving_vnode_mapping_worker(
158 notification_manager: NotificationManagerRef,
159 metadata_manager: MetadataManager,
160 serving_vnode_mapping: ServingVnodeMappingRef,
161) -> (JoinHandle<()>, Sender<()>) {
162 let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
163 let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
164 notification_manager
165 .insert_local_sender(local_notification_tx)
166 .await;
167 let join_handle = tokio::spawn(async move {
168 loop {
169 tokio::select! {
170 notification = local_notification_rx.recv() => {
171 match notification {
172 Some(notification) => {
173 match notification {
174 LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) => {
175 if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
176 continue;
177 }
178 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
179 let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers);
180 tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys());
181 notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&mappings) }));
182 }
183 LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
184 if fragment_ids.is_empty() {
185 continue;
186 }
187 let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
188 let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
189 match streaming_parallelisms.get(frag_id) {
190 Some(info) => Some((*frag_id, info.clone())),
191 None => {
192 tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
193 None
194 }
195 }
196 }).collect();
197 let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers);
198 if !upserted.is_empty() {
199 tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
200 notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
201 }
202 if !failed.is_empty() {
203 tracing::debug!("Fail to update serving vnode mapping for fragments {:?}.", failed);
204 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
205 }
206 }
207 LocalNotification::FragmentMappingsDelete(fragment_ids) => {
208 if fragment_ids.is_empty() {
209 continue;
210 }
211 tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
212 serving_vnode_mapping.remove(&fragment_ids);
213 notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
214 }
215 _ => {}
216 }
217 }
218 None => {
219 return;
220 }
221 }
222 }
223 _ = &mut shutdown_rx => {
224 return;
225 }
226 }
227 }
228 });
229 (join_handle, shutdown_tx)
230}