risingwave_meta_service/
serving_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::serving_service_server::ServingService;
17use risingwave_pb::meta::{
18    FragmentWorkerSlotMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse,
19};
20use tonic::{Request, Response, Status};
21
22use crate::serving::ServingVnodeMappingRef;
23
24pub struct ServingServiceImpl {
25    serving_vnode_mapping: ServingVnodeMappingRef,
26    metadata_manager: MetadataManager,
27}
28
29impl ServingServiceImpl {
30    pub fn new(
31        serving_vnode_mapping: ServingVnodeMappingRef,
32        metadata_manager: MetadataManager,
33    ) -> Self {
34        Self {
35            serving_vnode_mapping,
36            metadata_manager,
37        }
38    }
39}
40
41#[async_trait::async_trait]
42impl ServingService for ServingServiceImpl {
43    async fn get_serving_vnode_mappings(
44        &self,
45        _request: Request<GetServingVnodeMappingsRequest>,
46    ) -> Result<Response<GetServingVnodeMappingsResponse>, Status> {
47        let mappings = self
48            .serving_vnode_mapping
49            .all()
50            .into_iter()
51            .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
52                fragment_id,
53                mapping: Some(mapping.to_protobuf()),
54            })
55            .collect();
56        let fragment_to_table = self
57            .metadata_manager
58            .catalog_controller
59            .fragment_job_mapping()
60            .await?
61            .into_iter()
62            .map(|(fragment_id, job_id)| (fragment_id as u32, job_id as u32))
63            .collect();
64        Ok(Response::new(GetServingVnodeMappingsResponse {
65            fragment_to_table,
66            worker_slot_mappings: mappings,
67        }))
68    }
69}