risingwave_meta/serving/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17
18use parking_lot::RwLock;
19use risingwave_common::hash::WorkerSlotMapping;
20use risingwave_common::vnode_mapping::vnode_placement::place_vnode;
21use risingwave_pb::common::{WorkerNode, WorkerType};
22use risingwave_pb::meta::subscribe_response::{Info, Operation};
23use risingwave_pb::meta::table_fragments::fragment::FragmentDistributionType;
24use risingwave_pb::meta::{FragmentWorkerSlotMapping, FragmentWorkerSlotMappings};
25use tokio::sync::oneshot::Sender;
26use tokio::task::JoinHandle;
27
28use crate::controller::fragment::FragmentParallelismInfo;
29use crate::manager::{LocalNotification, MetadataManager, NotificationManagerRef};
30use crate::model::FragmentId;
31
32pub type ServingVnodeMappingRef = Arc<ServingVnodeMapping>;
33
34#[derive(Default)]
35pub struct ServingVnodeMapping {
36    serving_vnode_mappings: RwLock<HashMap<FragmentId, WorkerSlotMapping>>,
37}
38
39impl ServingVnodeMapping {
40    pub fn all(&self) -> HashMap<FragmentId, WorkerSlotMapping> {
41        self.serving_vnode_mappings.read().clone()
42    }
43
44    /// Upsert mapping for given fragments according to the latest `workers`.
45    /// Returns (successful updates, failed updates).
46    pub fn upsert(
47        &self,
48        streaming_parallelisms: HashMap<FragmentId, FragmentParallelismInfo>,
49        workers: &[WorkerNode],
50    ) -> (HashMap<FragmentId, WorkerSlotMapping>, Vec<FragmentId>) {
51        let mut serving_vnode_mappings = self.serving_vnode_mappings.write();
52        let mut upserted: HashMap<FragmentId, WorkerSlotMapping> = HashMap::default();
53        let mut failed: Vec<FragmentId> = vec![];
54        for (fragment_id, info) in streaming_parallelisms {
55            let new_mapping = {
56                let old_mapping = serving_vnode_mappings.get(&fragment_id);
57                let max_parallelism = match info.distribution_type {
58                    FragmentDistributionType::Unspecified => unreachable!(),
59                    FragmentDistributionType::Single => Some(1),
60                    FragmentDistributionType::Hash => None,
61                };
62                place_vnode(old_mapping, workers, max_parallelism, info.vnode_count)
63            };
64            match new_mapping {
65                None => {
66                    serving_vnode_mappings.remove(&fragment_id as _);
67                    failed.push(fragment_id);
68                }
69                Some(mapping) => {
70                    serving_vnode_mappings.insert(fragment_id, mapping.clone());
71                    upserted.insert(fragment_id, mapping);
72                }
73            }
74        }
75        (upserted, failed)
76    }
77
78    fn remove(&self, fragment_ids: &[FragmentId]) {
79        let mut mappings = self.serving_vnode_mappings.write();
80        for fragment_id in fragment_ids {
81            mappings.remove(fragment_id);
82        }
83    }
84}
85
86pub(crate) fn to_fragment_worker_slot_mapping(
87    mappings: &HashMap<FragmentId, WorkerSlotMapping>,
88) -> Vec<FragmentWorkerSlotMapping> {
89    mappings
90        .iter()
91        .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
92            fragment_id: *fragment_id,
93            mapping: Some(mapping.to_protobuf()),
94        })
95        .collect()
96}
97
98pub(crate) fn to_deleted_fragment_worker_slot_mapping(
99    fragment_ids: &[FragmentId],
100) -> Vec<FragmentWorkerSlotMapping> {
101    fragment_ids
102        .iter()
103        .map(|fragment_id| FragmentWorkerSlotMapping {
104            fragment_id: *fragment_id,
105            mapping: None,
106        })
107        .collect()
108}
109
110pub async fn on_meta_start(
111    notification_manager: NotificationManagerRef,
112    metadata_manager: &MetadataManager,
113    serving_vnode_mapping: ServingVnodeMappingRef,
114) {
115    let (serving_compute_nodes, streaming_parallelisms) =
116        fetch_serving_infos(metadata_manager).await;
117    let (mappings, _) =
118        serving_vnode_mapping.upsert(streaming_parallelisms, &serving_compute_nodes);
119    tracing::debug!(
120        "Initialize serving vnode mapping snapshot for fragments {:?}.",
121        mappings.keys()
122    );
123    notification_manager.notify_frontend_without_version(
124        Operation::Snapshot,
125        Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings {
126            mappings: to_fragment_worker_slot_mapping(&mappings),
127        }),
128    );
129}
130
131async fn fetch_serving_infos(
132    metadata_manager: &MetadataManager,
133) -> (
134    Vec<WorkerNode>,
135    HashMap<FragmentId, FragmentParallelismInfo>,
136) {
137    // TODO: need another mechanism to refresh serving info instead of panic.
138    let parallelisms = metadata_manager
139        .catalog_controller
140        .running_fragment_parallelisms(None)
141        .await
142        .expect("fail to fetch running parallelisms");
143    let serving_compute_nodes = metadata_manager
144        .cluster_controller
145        .list_active_serving_workers()
146        .await
147        .expect("fail to list serving compute nodes");
148    (
149        serving_compute_nodes,
150        parallelisms
151            .into_iter()
152            .map(|(fragment_id, info)| (fragment_id as FragmentId, info))
153            .collect(),
154    )
155}
156
157pub async fn start_serving_vnode_mapping_worker(
158    notification_manager: NotificationManagerRef,
159    metadata_manager: MetadataManager,
160    serving_vnode_mapping: ServingVnodeMappingRef,
161) -> (JoinHandle<()>, Sender<()>) {
162    let (local_notification_tx, mut local_notification_rx) = tokio::sync::mpsc::unbounded_channel();
163    let (shutdown_tx, mut shutdown_rx) = tokio::sync::oneshot::channel();
164    notification_manager
165        .insert_local_sender(local_notification_tx)
166        .await;
167    let join_handle = tokio::spawn(async move {
168        loop {
169            tokio::select! {
170                notification = local_notification_rx.recv() => {
171                    match notification {
172                        Some(notification) => {
173                            match notification {
174                                LocalNotification::WorkerNodeActivated(w) | LocalNotification::WorkerNodeDeleted(w) =>  {
175                                    if w.r#type() != WorkerType::ComputeNode || !w.property.as_ref().is_some_and(|p| p.is_serving) {
176                                        continue;
177                                    }
178                                    let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
179                                    let (mappings, _) = serving_vnode_mapping.upsert(streaming_parallelisms, &workers);
180                                    tracing::debug!("Update serving vnode mapping snapshot for fragments {:?}.", mappings.keys());
181                                    notification_manager.notify_frontend_without_version(Operation::Snapshot, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&mappings) }));
182                                }
183                                LocalNotification::FragmentMappingsUpsert(fragment_ids) => {
184                                    if fragment_ids.is_empty() {
185                                        continue;
186                                    }
187                                    let (workers, streaming_parallelisms) = fetch_serving_infos(&metadata_manager).await;
188                                    let filtered_streaming_parallelisms = fragment_ids.iter().filter_map(|frag_id| {
189                                        match streaming_parallelisms.get(frag_id) {
190                                            Some(info) => Some((*frag_id, info.clone())),
191                                            None => {
192                                                tracing::warn!(fragment_id = *frag_id, "streaming parallelism not found");
193                                                None
194                                            }
195                                        }
196                                    }).collect();
197                                    let (upserted, failed) = serving_vnode_mapping.upsert(filtered_streaming_parallelisms, &workers);
198                                    if !upserted.is_empty() {
199                                        tracing::debug!("Update serving vnode mapping for fragments {:?}.", upserted.keys());
200                                        notification_manager.notify_frontend_without_version(Operation::Update, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_fragment_worker_slot_mapping(&upserted) }));
201                                    }
202                                    if !failed.is_empty() {
203                                        tracing::debug!("Fail to update serving vnode mapping for fragments {:?}.", failed);
204                                        notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&failed)}));
205                                    }
206                                }
207                                LocalNotification::FragmentMappingsDelete(fragment_ids) => {
208                                    if fragment_ids.is_empty() {
209                                        continue;
210                                    }
211                                    tracing::debug!("Delete serving vnode mapping for fragments {:?}.", fragment_ids);
212                                    serving_vnode_mapping.remove(&fragment_ids);
213                                    notification_manager.notify_frontend_without_version(Operation::Delete, Info::ServingWorkerSlotMappings(FragmentWorkerSlotMappings{ mappings: to_deleted_fragment_worker_slot_mapping(&fragment_ids) }));
214                                }
215                                _ => {}
216                            }
217                        }
218                        None => {
219                            return;
220                        }
221                    }
222                }
223                _ = &mut shutdown_rx => {
224                    return;
225                }
226            }
227        }
228    });
229    (join_handle, shutdown_tx)
230}