risingwave_meta/serving/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::controller::session_params::SessionParamsControllerRef;
30use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
31use crate::model::FragmentId;
32
33pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
34
35#[derive(Default)]
36pub struct ServingVnodeMapping {
37    serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
38}
39
40impl ServingVnodeMapping {
41    pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
42        self.serving_vnode_mappings.read().clone()
43    }
44
45    /// Upsert mapping for given fragments according to the latest `workers`.
46    /// Returns (successful updates, failed updates).
47    pub fn upsert(
48        &self,
49        streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
50        workers: &[WorkerNode],
51        max_serving_parallelism: Option<u64>,
52    ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
53        let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
54        let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
55        let mut failed: Vec<FragmentId> = vec![];
56        for (fragment_id, info) in streaming_parallelisms {
57            let new_mapping = {
58                let old_mapping = serving_vnode_mappings.get(&fragment_id);
59                let max_parallelism = match info.distribution_type {
60                    FragmentDistributionType::Unspecified => unreachable!(),
61                    FragmentDistributionType::Single => Some(1),
62                    FragmentDistributionType::Hash => None,
63                }
64                .or_else(|| max_serving_parallelism.map(|p| p as usize));
65                place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
66            };
67            match new_mapping {
68                None => {
69                    serving_vnode_mappings.remove(&fragment_id as _);
70                    failed.push(fragment_id);
71                }
72                Some(mapping) => {
73                    serving_vnode_mappings.insert(fragment_id, mapping.clone());
74                    upserted.insert(fragment_id, mapping);
75                }
76            }
77        }
78        (upserted, failed)
79    }
80
81    fn remove(&self, fragment_ids: &[FragmentId]) {
82        let mut mappings = self.serving_vnode_mappings.write();
83        for fragment_id in fragment_ids {
84            mappings.remove(fragment_id);
85        }
86    }
87}
88
89pub(crate) fn to_fragment_worker_slot_mapping(
90    mappings: &HashMap<FragmentId, WorkerSlotMapping>,
91) -> Vec<FragmentWorkerSlotMapping> {
92    mappings
93        .iter()
94        .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
95            fragment_id: *fragment_id,
96            mapping: Some(mapping.to_protobuf()),
97        })
98        .collect()
99}
100
101pub(crate) fn to_deleted_fragment_worker_slot_mapping(
102    fragment_ids: &[FragmentId],
103) -> Vec<FragmentWorkerSlotMapping> {
104    fragment_ids
105        .iter()
106        .map(|fragment_id| FragmentWorkerSlotMapping {
107            fragment_id: *fragment_id,
108            mapping: None,
109        })
110        .collect()
111}
112
113pub async fn on_meta_start(
114    notification_manager: NotificationManagerRef,
115    metadata_manager: &MetadataManager,
116    serving_vnode_mapping: ServingVnodeMappingRef,
117    max_serving_parallelism: Option<u64>,
118) {
119    let (serving_compute_nodes, streaming_parallelisms) =
120        fetch_serving_infos(metadata_manager).await;
121    let (mappings, failed) = serving_vnode_mapping.upsert(
122        streaming_parallelisms,
123        &serving_compute_nodes,
124        max_serving_parallelism,
125    );
126    tracing::debug!(
127        "Initialize serving vnode mapping snapshot for fragments {:?}.",
128        mappings.keys()
129    );
130    if !failed.is_empty() {
131        tracing::warn!(
132            "Fail to update serving vnode mapping for fragments {:?}.",
133            failed
134        );
135    }
136    notification_manager.notify_frontend_without_version(
137        Operation::Snapshot,
138        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
139            mappings: to_fragment_worker_slot_mapping(&mappings),
140        }),
141    );
142}
143
144async fn fetch_serving_infos(
145    metadata_manager: &MetadataManager,
146) -> (
147    Vec<WorkerNode>,
148    HashMap<FragmentId, FragmentParallelismInfo>,
149) {
150    // TODO: need another mechanism to refresh serving info instead of panic.
151    let parallelisms = metadata_manager
152        .catalog_controller
153        .running_fragment_parallelisms(None)
154        .expect("fail to fetch running parallelisms");
155    let serving_compute_nodes = metadata_manager
156        .cluster_controller
157        .list_active_serving_workers()
158        .await
159        .expect("fail to list serving compute nodes");
160    (
161        serving_compute_nodes,
162        parallelisms
163            .into_iter()
164            .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
165            .collect(),
166    )
167}
168
169pub fn start_serving_vnode_mapping_worker(
170    notification_manager: NotificationManagerRef,
171    metadata_manager: MetadataManager,
172    serving_vnode_mapping: ServingVnodeMappingRef,
173    session_params: SessionParamsControllerRef,
174) -> (JoinHandle<()>, Sender<()>) {
175    let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
176    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
177    notification_manager.insert_local_sender(local_notification_tx);
178    let join_handle = tokio::spawn(async move {
179        let reset = || async {
180            let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
181            let max_serving_parallelism = session_params
182                .get_params()
183                .await
184                .batch_parallelism()
185                .map(|p| p.get());
186            let (mappings, failed) = serving_vnode_mapping.upsert(
187                streaming_parallelisms,
188                &workers,
189                max_serving_parallelism,
190            );
191            tracing::debug!(
192                "Update serving vnode mapping snapshot for fragments {:?}.",
193                mappings.keys()
194            );
195            if !failed.is_empty() {
196                tracing::warn!(
197                    "Fail to update serving vnode mapping for fragments {:?}.",
198                    failed
199                );
200            }
201            notification_manager.notify_frontend_without_version(
202                Operation::Snapshot,
203                Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
204                    mappings: to_fragment_worker_slot_mapping(&mappings),
205                }),
206            );
207        };
208        loop {
209            tokio::select! {
210                notification = local_notification_rx.recv() => {
211                    match notification {
212                        Some(notification) => {
213                            match notification {
214                                LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) =>  {
215                                    if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
216                                        continue;
217                                    }
218                                    reset().await;
219                                }
220                                LocalNotification::BatchParallelismChange => {
221                                    reset().await;
222                                }
223                                LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
224                                    if fragment_ids.is_empty() {
225                                        continue;
226                                    }
227                                    let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
228                                    let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
229                                        match streaming_parallelisms.get(frag_id) {
230                                            Some(info) => Some((*frag_id, info.clone())),
231                                            None => {
232                                                tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
233                                                None
234                                            }
235                                        }
236                                    }).collect();
237                                    let max_serving_parallelism = session_params
238                                        .get_params()
239                                        .await
240                                        .batch_parallelism()
241                                        .map(|p|p.get());
242                                    let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers, max_serving_parallelism);
243                                    if !upserted.is_empty() {
244                                        tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
245                                        notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
246                                    }
247                                    if !failed.is_empty() {
248                                        tracing::warn!("Fail to update serving vnode mapping for fragments {:?}.", failed);
249                                        notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
250                                    }
251                                }
252                                LocalNotification::FragmentMappingsDelete(fragment_ids) => {
253                                    if fragment_ids.is_empty() {
254                                        continue;
255                                    }
256                                    tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
257                                    serving_vnode_mapping.remove(&fragment_ids);
258                                    notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
259                                }
260                                _ => {}
261                            }
262                        }
263                        None => {
264                            return;
265                        }
266                    }
267                }
268                _ = &mut shutdown_rx => {
269                    return;
270                }
271            }
272        }
273    });
274    (join_handle, shutdown_tx)
275}