risingwave_meta_service/
scale_service.rs1use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::meta::scale_service_server::ScaleService;
18use risingwave_pb::meta::{
19 GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
20 GetServerlessStreamingJobsStatusResponse, RescheduleRequest, RescheduleResponse,
21 UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
22};
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use tonic::{Request, Response, Status};
25
26use crate::barrier::BarrierManagerRef;
27use crate::stream::GlobalStreamManagerRef;
28
29pub struct ScaleServiceImpl {
30 metadata_manager: MetadataManager,
31 stream_manager: GlobalStreamManagerRef,
32 env: MetaSrvEnv,
33}
34
35impl ScaleServiceImpl {
36 pub fn new(
37 metadata_manager: MetadataManager,
38 stream_manager: GlobalStreamManagerRef,
39 _barrier_manager: BarrierManagerRef,
40 env: MetaSrvEnv,
41 ) -> Self {
42 Self {
43 metadata_manager,
44 stream_manager,
45 env,
46 }
47 }
48}
49
50#[async_trait::async_trait]
51impl ScaleService for ScaleServiceImpl {
52 async fn get_cluster_info(
53 &self,
54 _: Request<GetClusterInfoRequest>,
55 ) -> Result<Response<GetClusterInfoResponse>, Status> {
56 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
57
58 let stream_job_fragments = self
59 .metadata_manager
60 .catalog_controller
61 .table_fragments()
62 .await?;
63
64 let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
65 for (_, (stream_job_fragments, fragment_actors, actor_status)) in stream_job_fragments {
66 let upstreams = self
67 .metadata_manager
68 .catalog_controller
69 .upstream_fragments(stream_job_fragments.fragment_ids())
70 .await?;
71 let dispatchers = self
72 .metadata_manager
73 .catalog_controller
74 .get_fragment_actor_dispatchers(
75 stream_job_fragments
76 .fragment_ids()
77 .map(|id| id as _)
78 .collect(),
79 )
80 .await?;
81 table_fragments.push(stream_job_fragments.to_protobuf(
82 &fragment_actors,
83 &upstreams,
84 &dispatchers,
85 actor_status,
86 ))
87 }
88
89 let worker_nodes = self
90 .metadata_manager
91 .list_worker_node(Some(WorkerType::ComputeNode), None)
92 .await?;
93
94 let actor_splits = self
95 .env
96 .shared_actor_infos()
97 .list_assignments()
98 .into_iter()
99 .map(|(actor_id, splits)| {
100 (
101 actor_id,
102 ConnectorSplits {
103 splits: splits.iter().map(ConnectorSplit::from).collect(),
104 },
105 )
106 })
107 .collect();
108
109 let sources = self.metadata_manager.list_sources().await?;
110 let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
111
112 Ok(Response::new(GetClusterInfoResponse {
113 worker_nodes,
114 table_fragments,
115 actor_splits,
116 source_infos,
117 revision: 0,
118 }))
119 }
120
121 async fn reschedule(
122 &self,
123 _request: Request<RescheduleRequest>,
124 ) -> Result<Response<RescheduleResponse>, Status> {
125 Ok(Response::new(RescheduleResponse {
126 success: false,
127 revision: 0,
128 }))
129 }
130
131 async fn update_streaming_job_node_labels(
132 &self,
133 _request: Request<UpdateStreamingJobNodeLabelsRequest>,
134 ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
135 todo!()
136 }
137
138 async fn get_serverless_streaming_jobs_status(
139 &self,
140 _request: Request<GetServerlessStreamingJobsStatusRequest>,
141 ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
142 todo!()
143 }
144}