risingwave_meta/serving/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::controller::session_params::SessionParamsControllerRef;
30use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
31use crate::model::FragmentId;
32
33pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
34
35#[derive(Default)]
36pub struct ServingVnodeMapping {
37    serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
38}
39
40impl ServingVnodeMapping {
41    pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
42        self.serving_vnode_mappings.read().clone()
43    }
44
45    /// Upsert mapping for given fragments according to the latest `workers`.
46    /// Returns (successful updates, failed updates).
47    pub fn upsert(
48        &self,
49        streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
50        workers: &[WorkerNode],
51        max_serving_parallelism: Option<u64>,
52    ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
53        let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
54        let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
55        let mut failed: Vec<FragmentId> = vec![];
56        for (fragment_id, info) in streaming_parallelisms {
57            let new_mapping = {
58                let old_mapping = serving_vnode_mappings.get(&fragment_id);
59                let max_parallelism = match info.distribution_type {
60                    FragmentDistributionType::Unspecified => unreachable!(),
61                    FragmentDistributionType::Single => Some(1),
62                    FragmentDistributionType::Hash => None,
63                }
64                .or_else(|| max_serving_parallelism.map(|p| p as usize));
65                place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
66            };
67            match new_mapping {
68                None => {
69                    serving_vnode_mappings.remove(&fragment_id as _);
70                    failed.push(fragment_id);
71                }
72                Some(mapping) => {
73                    serving_vnode_mappings.insert(fragment_id, mapping.clone());
74                    upserted.insert(fragment_id, mapping);
75                }
76            }
77        }
78        (upserted, failed)
79    }
80
81    fn remove(&self, fragment_ids: &[FragmentId]) {
82        let mut mappings = self.serving_vnode_mappings.write();
83        for fragment_id in fragment_ids {
84            mappings.remove(fragment_id);
85        }
86    }
87}
88
89pub(crate) fn to_fragment_worker_slot_mapping(
90    mappings: &HashMap<FragmentId, WorkerSlotMapping>,
91) -> Vec<FragmentWorkerSlotMapping> {
92    mappings
93        .iter()
94        .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
95            fragment_id: *fragment_id,
96            mapping: Some(mapping.to_protobuf()),
97        })
98        .collect()
99}
100
101pub(crate) fn to_deleted_fragment_worker_slot_mapping(
102    fragment_ids: &[FragmentId],
103) -> Vec<FragmentWorkerSlotMapping> {
104    fragment_ids
105        .iter()
106        .map(|fragment_id| FragmentWorkerSlotMapping {
107            fragment_id: *fragment_id,
108            mapping: None,
109        })
110        .collect()
111}
112
113pub async fn on_meta_start(
114    notification_manager: NotificationManagerRef,
115    metadata_manager: &MetadataManager,
116    serving_vnode_mapping: ServingVnodeMappingRef,
117    max_serving_parallelism: Option<u64>,
118) {
119    let (serving_compute_nodes, streaming_parallelisms) =
120        fetch_serving_infos(metadata_manager).await;
121    let (mappings, failed) = serving_vnode_mapping.upsert(
122        streaming_parallelisms,
123        &serving_compute_nodes,
124        max_serving_parallelism,
125    );
126    tracing::debug!(
127        "Initialize serving vnode mapping snapshot for fragments {:?}.",
128        mappings.keys()
129    );
130    if !failed.is_empty() {
131        tracing::warn!(
132            "Fail to update serving vnode mapping for fragments {:?}.",
133            failed
134        );
135    }
136    notification_manager.notify_frontend_without_version(
137        Operation::Snapshot,
138        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
139            mappings: to_fragment_worker_slot_mapping(&mappings),
140        }),
141    );
142}
143
144async fn fetch_serving_infos(
145    metadata_manager: &MetadataManager,
146) -> (
147    Vec<WorkerNode>,
148    HashMap<FragmentId, FragmentParallelismInfo>,
149) {
150    // TODO: need another mechanism to refresh serving info instead of panic.
151    let parallelisms = metadata_manager
152        .catalog_controller
153        .running_fragment_parallelisms(None)
154        .await
155        .expect("fail to fetch running parallelisms");
156    let serving_compute_nodes = metadata_manager
157        .cluster_controller
158        .list_active_serving_workers()
159        .await
160        .expect("fail to list serving compute nodes");
161    (
162        serving_compute_nodes,
163        parallelisms
164            .into_iter()
165            .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
166            .collect(),
167    )
168}
169
170pub fn start_serving_vnode_mapping_worker(
171    notification_manager: NotificationManagerRef,
172    metadata_manager: MetadataManager,
173    serving_vnode_mapping: ServingVnodeMappingRef,
174    session_params: SessionParamsControllerRef,
175) -> (JoinHandle<()>, Sender<()>) {
176    let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
177    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
178    notification_manager.insert_local_sender(local_notification_tx);
179    let join_handle = tokio::spawn(async move {
180        let reset = || async {
181            let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
182            let max_serving_parallelism = session_params
183                .get_params()
184                .await
185                .batch_parallelism()
186                .map(|p| p.get());
187            let (mappings, failed) = serving_vnode_mapping.upsert(
188                streaming_parallelisms,
189                &workers,
190                max_serving_parallelism,
191            );
192            tracing::debug!(
193                "Update serving vnode mapping snapshot for fragments {:?}.",
194                mappings.keys()
195            );
196            if !failed.is_empty() {
197                tracing::warn!(
198                    "Fail to update serving vnode mapping for fragments {:?}.",
199                    failed
200                );
201            }
202            notification_manager.notify_frontend_without_version(
203                Operation::Snapshot,
204                Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
205                    mappings: to_fragment_worker_slot_mapping(&mappings),
206                }),
207            );
208        };
209        loop {
210            tokio::select! {
211                notification = local_notification_rx.recv() => {
212                    match notification {
213                        Some(notification) => {
214                            match notification {
215                                LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) =>  {
216                                    if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
217                                        continue;
218                                    }
219                                    reset().await;
220                                }
221                                LocalNotification::BatchParallelismChange => {
222                                    reset().await;
223                                }
224                                LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
225                                    if fragment_ids.is_empty() {
226                                        continue;
227                                    }
228                                    let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
229                                    let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
230                                        match streaming_parallelisms.get(frag_id) {
231                                            Some(info) => Some((*frag_id, info.clone())),
232                                            None => {
233                                                tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
234                                                None
235                                            }
236                                        }
237                                    }).collect();
238                                    let max_serving_parallelism = session_params
239                                        .get_params()
240                                        .await
241                                        .batch_parallelism()
242                                        .map(|p|p.get());
243                                    let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers, max_serving_parallelism);
244                                    if !upserted.is_empty() {
245                                        tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
246                                        notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
247                                    }
248                                    if !failed.is_empty() {
249                                        tracing::warn!("Fail to update serving vnode mapping for fragments {:?}.", failed);
250                                        notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
251                                    }
252                                }
253                                LocalNotification::FragmentMappingsDelete(fragment_ids) => {
254                                    if fragment_ids.is_empty() {
255                                        continue;
256                                    }
257                                    tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
258                                    serving_vnode_mapping.remove(&fragment_ids);
259                                    notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
260                                }
261                                _ => {}
262                            }
263                        }
264                        None => {
265                            return;
266                        }
267                    }
268                }
269                _ = &mut shutdown_rx => {
270                    return;
271                }
272            }
273        }
274    });
275    (join_handle, shutdown_tx)
276}