risingwave_meta/serving/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::controller::session_params::SessionParamsControllerRef;
30use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
31use crate::model::FragmentId;
32
33pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
34
35#[derive(Default)]
36pub struct ServingVnodeMapping {
37    serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
38}
39
40impl ServingVnodeMapping {
41    pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
42        self.serving_vnode_mappings.read().clone()
43    }
44
45    /// Upsert mapping for given fragments according to the latest `workers`.
46    /// Returns (successful updates, failed updates).
47    pub fn upsert(
48        &self,
49        streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
50        workers: &[WorkerNode],
51        max_serving_parallelism: Option<u64>,
52    ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
53        let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
54        let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
55        let mut failed: Vec<FragmentId> = vec![];
56        for (fragment_id, info) in streaming_parallelisms {
57            let new_mapping = {
58                let old_mapping = serving_vnode_mappings.get(&fragment_id);
59                let max_parallelism = match info.distribution_type {
60                    FragmentDistributionType::Unspecified => unreachable!(),
61                    FragmentDistributionType::Single => Some(1),
62                    FragmentDistributionType::Hash => None,
63                }
64                .or_else(|| max_serving_parallelism.map(|p| p as usize));
65                place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
66            };
67            match new_mapping {
68                None => {
69                    serving_vnode_mappings.remove(&fragment_id as _);
70                    failed.push(fragment_id);
71                }
72                Some(mapping) => {
73                    serving_vnode_mappings.insert(fragment_id, mapping.clone());
74                    upserted.insert(fragment_id, mapping);
75                }
76            }
77        }
78        (upserted, failed)
79    }
80
81    fn remove(&self, fragment_ids: &[FragmentId]) {
82        let mut mappings = self.serving_vnode_mappings.write();
83        for fragment_id in fragment_ids {
84            mappings.remove(fragment_id);
85        }
86    }
87}
88
89pub(crate) fn to_fragment_worker_slot_mapping(
90    mappings: &HashMap<FragmentId, WorkerSlotMapping>,
91) -> Vec<FragmentWorkerSlotMapping> {
92    mappings
93        .iter()
94        .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
95            fragment_id: *fragment_id,
96            mapping: Some(mapping.to_protobuf()),
97        })
98        .collect()
99}
100
101pub(crate) fn to_deleted_fragment_worker_slot_mapping(
102    fragment_ids: &[FragmentId],
103) -> Vec<FragmentWorkerSlotMapping> {
104    fragment_ids
105        .iter()
106        .map(|fragment_id| FragmentWorkerSlotMapping {
107            fragment_id: *fragment_id,
108            mapping: None,
109        })
110        .collect()
111}
112
113pub async fn on_meta_start(
114    notification_manager: NotificationManagerRef,
115    metadata_manager: &MetadataManager,
116    serving_vnode_mapping: ServingVnodeMappingRef,
117    max_serving_parallelism: Option<u64>,
118) {
119    let (serving_compute_nodes, streaming_parallelisms) =
120        fetch_serving_infos(metadata_manager).await;
121    let (mappings, failed) = serving_vnode_mapping.upsert(
122        streaming_parallelisms,
123        &serving_compute_nodes,
124        max_serving_parallelism,
125    );
126    tracing::debug!(
127        "Initialize serving vnode mapping snapshot for fragments {:?}.",
128        mappings.keys()
129    );
130    if !failed.is_empty() {
131        tracing::warn!(
132            "Fail to update serving vnode mapping for fragments {:?}.",
133            failed
134        );
135    }
136    notification_manager.notify_frontend_without_version(
137        Operation::Snapshot,
138        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
139            mappings: to_fragment_worker_slot_mapping(&mappings),
140        }),
141    );
142}
143
144async fn fetch_serving_infos(
145    metadata_manager: &MetadataManager,
146) -> (
147    Vec<WorkerNode>,
148    HashMap<FragmentId, FragmentParallelismInfo>,
149) {
150    // TODO: need another mechanism to refresh serving info instead of panic.
151    let parallelisms = metadata_manager
152        .catalog_controller
153        .running_fragment_parallelisms(None)
154        .await
155        .expect("fail to fetch running parallelisms");
156    let serving_compute_nodes = metadata_manager
157        .cluster_controller
158        .list_active_serving_workers()
159        .await
160        .expect("fail to list serving compute nodes");
161    (
162        serving_compute_nodes,
163        parallelisms
164            .into_iter()
165            .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
166            .collect(),
167    )
168}
169
170pub async fn start_serving_vnode_mapping_worker(
171    notification_manager: NotificationManagerRef,
172    metadata_manager: MetadataManager,
173    serving_vnode_mapping: ServingVnodeMappingRef,
174    session_params: SessionParamsControllerRef,
175) -> (JoinHandle<()>, Sender<()>) {
176    let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
177    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
178    notification_manager
179        .insert_local_sender(local_notification_tx)
180        .await;
181    let join_handle = tokio::spawn(async move {
182        let reset = || async {
183            let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
184            let max_serving_parallelism = session_params
185                .get_params()
186                .await
187                .batch_parallelism()
188                .map(|p| p.get());
189            let (mappings, failed) = serving_vnode_mapping.upsert(
190                streaming_parallelisms,
191                &workers,
192                max_serving_parallelism,
193            );
194            tracing::debug!(
195                "Update serving vnode mapping snapshot for fragments {:?}.",
196                mappings.keys()
197            );
198            if !failed.is_empty() {
199                tracing::warn!(
200                    "Fail to update serving vnode mapping for fragments {:?}.",
201                    failed
202                );
203            }
204            notification_manager.notify_frontend_without_version(
205                Operation::Snapshot,
206                Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
207                    mappings: to_fragment_worker_slot_mapping(&mappings),
208                }),
209            );
210        };
211        loop {
212            tokio::select! {
213                notification = local_notification_rx.recv() => {
214                    match notification {
215                        Some(notification) => {
216                            match notification {
217                                LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) =>  {
218                                    if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
219                                        continue;
220                                    }
221                                    reset().await;
222                                }
223                                LocalNotification::BatchParallelismChange => {
224                                    reset().await;
225                                }
226                                LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
227                                    if fragment_ids.is_empty() {
228                                        continue;
229                                    }
230                                    let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
231                                    let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
232                                        match streaming_parallelisms.get(frag_id) {
233                                            Some(info) => Some((*frag_id, info.clone())),
234                                            None => {
235                                                tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
236                                                None
237                                            }
238                                        }
239                                    }).collect();
240                                    let max_serving_parallelism = session_params
241                                        .get_params()
242                                        .await
243                                        .batch_parallelism()
244                                        .map(|p|p.get());
245                                    let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers, max_serving_parallelism);
246                                    if !upserted.is_empty() {
247                                        tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
248                                        notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
249                                    }
250                                    if !failed.is_empty() {
251                                        tracing::warn!("Fail to update serving vnode mapping for fragments {:?}.", failed);
252                                        notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
253                                    }
254                                }
255                                LocalNotification::FragmentMappingsDelete(fragment_ids) => {
256                                    if fragment_ids.is_empty() {
257                                        continue;
258                                    }
259                                    tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
260                                    serving_vnode_mapping.remove(&fragment_ids);
261                                    notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
262                                }
263                                _ => {}
264                            }
265                        }
266                        None => {
267                            return;
268                        }
269                    }
270                }
271                _ = &mut shutdown_rx => {
272                    return;
273                }
274            }
275        }
276    });
277    (join_handle, shutdown_tx)
278}