risingwave_meta_service/
serving_service.rs1use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::serving_service_server::ServingService;
17use risingwave_pb::meta::{
18 FragmentWorkerSlotMapping, GetServingVnodeMappingsRequest, GetServingVnodeMappingsResponse,
19};
20use tonic::{Request, Response, Status};
21
22use crate::serving::ServingVnodeMappingRef;
23
24pub struct ServingServiceImpl {
25 serving_vnode_mapping: ServingVnodeMappingRef,
26 metadata_manager: MetadataManager,
27}
28
29impl ServingServiceImpl {
30 pub fn new(
31 serving_vnode_mapping: ServingVnodeMappingRef,
32 metadata_manager: MetadataManager,
33 ) -> Self {
34 Self {
35 serving_vnode_mapping,
36 metadata_manager,
37 }
38 }
39}
40
41#[async_trait::async_trait]
42impl ServingService for ServingServiceImpl {
43 async fn get_serving_vnode_mappings(
44 &self,
45 _request: Request<GetServingVnodeMappingsRequest>,
46 ) -> Result<Response<GetServingVnodeMappingsResponse>, Status> {
47 let mappings = self
48 .serving_vnode_mapping
49 .all()
50 .into_iter()
51 .map(|(fragment_id, mapping)| FragmentWorkerSlotMapping {
52 fragment_id,
53 mapping: Some(mapping.to_protobuf()),
54 })
55 .collect();
56 let fragment_to_table = self
57 .metadata_manager
58 .catalog_controller
59 .fragment_job_mapping()
60 .await?
61 .into_iter()
62 .map(|(fragment_id, job_id)| (fragment_id as u32, job_id as u32))
63 .collect();
64 Ok(Response::new(GetServingVnodeMappingsResponse {
65 fragment_to_table,
66 worker_slot_mappings: mappings,
67 }))
68 }
69}