risingwave_meta_service/
scale_service.rs1use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::meta::scale_service_server::ScaleService;
18use risingwave_pb::meta::{
19    GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
20    GetServerlessStreamingJobsStatusResponse, RescheduleRequest, RescheduleResponse,
21    UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
22};
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use tonic::{Request, Response, Status};
25
26use crate::barrier::BarrierManagerRef;
27use crate::stream::GlobalStreamManagerRef;
28
29pub struct ScaleServiceImpl {
30    metadata_manager: MetadataManager,
31    stream_manager: GlobalStreamManagerRef,
32    env: MetaSrvEnv,
33}
34
35impl ScaleServiceImpl {
36    pub fn new(
37        metadata_manager: MetadataManager,
38        stream_manager: GlobalStreamManagerRef,
39        _barrier_manager: BarrierManagerRef,
40        env: MetaSrvEnv,
41    ) -> Self {
42        Self {
43            metadata_manager,
44            stream_manager,
45            env,
46        }
47    }
48}
49
50#[async_trait::async_trait]
51impl ScaleService for ScaleServiceImpl {
52    async fn get_cluster_info(
53        &self,
54        _: Request<GetClusterInfoRequest>,
55    ) -> Result<Response<GetClusterInfoResponse>, Status> {
56        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
57
58        let stream_job_fragments = self
59            .metadata_manager
60            .catalog_controller
61            .table_fragments()
62            .await?;
63
64        let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
65        for (_, stream_job_fragments) in stream_job_fragments {
66            let upstreams = self
67                .metadata_manager
68                .catalog_controller
69                .upstream_fragments(stream_job_fragments.fragment_ids())
70                .await?;
71            let dispatchers = self
72                .metadata_manager
73                .catalog_controller
74                .get_fragment_actor_dispatchers(
75                    stream_job_fragments
76                        .fragment_ids()
77                        .map(|id| id as _)
78                        .collect(),
79                )
80                .await?;
81            table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
82        }
83
84        let worker_nodes = self
85            .metadata_manager
86            .list_worker_node(Some(WorkerType::ComputeNode), None)
87            .await?;
88
89        let actor_splits = self
90            .env
91            .shared_actor_infos()
92            .list_assignments()
93            .into_iter()
94            .map(|(actor_id, splits)| {
95                (
96                    actor_id,
97                    ConnectorSplits {
98                        splits: splits.iter().map(ConnectorSplit::from).collect(),
99                    },
100                )
101            })
102            .collect();
103
104        let sources = self.metadata_manager.list_sources().await?;
105        let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
106
107        Ok(Response::new(GetClusterInfoResponse {
108            worker_nodes,
109            table_fragments,
110            actor_splits,
111            source_infos,
112            revision: 0,
113        }))
114    }
115
116    async fn reschedule(
117        &self,
118        _request: Request<RescheduleRequest>,
119    ) -> Result<Response<RescheduleResponse>, Status> {
120        Ok(Response::new(RescheduleResponse {
121            success: false,
122            revision: 0,
123        }))
124    }
125
126    async fn update_streaming_job_node_labels(
127        &self,
128        _request: Request<UpdateStreamingJobNodeLabelsRequest>,
129    ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
130        todo!()
131    }
132
133    async fn get_serverless_streaming_jobs_status(
134        &self,
135        _request: Request<GetServerlessStreamingJobsStatusRequest>,
136    ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
137        todo!()
138    }
139}