risingwave_meta_service/
scale_service.rs1use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::meta::scale_service_server::ScaleService;
18use risingwave_pb::meta::{
19 GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
20 GetServerlessStreamingJobsStatusResponse, RescheduleRequest, RescheduleResponse,
21 UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
22};
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use tonic::{Request, Response, Status};
25
26use crate::barrier::BarrierManagerRef;
27use crate::stream::GlobalStreamManagerRef;
28
29pub struct ScaleServiceImpl {
30 metadata_manager: MetadataManager,
31 stream_manager: GlobalStreamManagerRef,
32 env: MetaSrvEnv,
33}
34
35impl ScaleServiceImpl {
36 pub fn new(
37 metadata_manager: MetadataManager,
38 stream_manager: GlobalStreamManagerRef,
39 _barrier_manager: BarrierManagerRef,
40 env: MetaSrvEnv,
41 ) -> Self {
42 Self {
43 metadata_manager,
44 stream_manager,
45 env,
46 }
47 }
48}
49
50#[async_trait::async_trait]
51impl ScaleService for ScaleServiceImpl {
52 async fn get_cluster_info(
53 &self,
54 _: Request<GetClusterInfoRequest>,
55 ) -> Result<Response<GetClusterInfoResponse>, Status> {
56 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
57
58 let stream_job_fragments = self
59 .metadata_manager
60 .catalog_controller
61 .table_fragments()
62 .await?;
63
64 let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
65 for (_, stream_job_fragments) in stream_job_fragments {
66 let upstreams = self
67 .metadata_manager
68 .catalog_controller
69 .upstream_fragments(stream_job_fragments.fragment_ids())
70 .await?;
71 let dispatchers = self
72 .metadata_manager
73 .catalog_controller
74 .get_fragment_actor_dispatchers(
75 stream_job_fragments
76 .fragment_ids()
77 .map(|id| id as _)
78 .collect(),
79 )
80 .await?;
81 table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
82 }
83
84 let worker_nodes = self
85 .metadata_manager
86 .list_worker_node(Some(WorkerType::ComputeNode), None)
87 .await?;
88
89 let actor_splits = self
90 .env
91 .shared_actor_infos()
92 .list_assignments()
93 .into_iter()
94 .map(|(actor_id, splits)| {
95 (
96 actor_id,
97 ConnectorSplits {
98 splits: splits.iter().map(ConnectorSplit::from).collect(),
99 },
100 )
101 })
102 .collect();
103
104 let sources = self.metadata_manager.list_sources().await?;
105 let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
106
107 Ok(Response::new(GetClusterInfoResponse {
108 worker_nodes,
109 table_fragments,
110 actor_splits,
111 source_infos,
112 revision: 0,
113 }))
114 }
115
116 async fn reschedule(
117 &self,
118 _request: Request<RescheduleRequest>,
119 ) -> Result<Response<RescheduleResponse>, Status> {
120 Ok(Response::new(RescheduleResponse {
121 success: false,
122 revision: 0,
123 }))
124 }
125
126 async fn update_streaming_job_node_labels(
127 &self,
128 _request: Request<UpdateStreamingJobNodeLabelsRequest>,
129 ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
130 todo!()
131 }
132
133 async fn get_serverless_streaming_jobs_status(
134 &self,
135 _request: Request<GetServerlessStreamingJobsStatusRequest>,
136 ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
137 todo!()
138 }
139}