risingwave_meta_service/
scale_service.rs1use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19 JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, WorkerReschedule,
20};
21use risingwave_meta_model::FragmentId;
22use risingwave_pb::common::WorkerType;
23use risingwave_pb::meta::scale_service_server::ScaleService;
24use risingwave_pb::meta::{
25 GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
26 GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
27 RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
28};
29use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
30use tonic::{Request, Response, Status};
31
32use crate::barrier::BarrierManagerRef;
33use crate::stream::GlobalStreamManagerRef;
34
35pub struct ScaleServiceImpl {
36 metadata_manager: MetadataManager,
37 stream_manager: GlobalStreamManagerRef,
38 barrier_manager: BarrierManagerRef,
39 env: MetaSrvEnv,
40}
41
42impl ScaleServiceImpl {
43 pub fn new(
44 metadata_manager: MetadataManager,
45 stream_manager: GlobalStreamManagerRef,
46 barrier_manager: BarrierManagerRef,
47 env: MetaSrvEnv,
48 ) -> Self {
49 Self {
50 metadata_manager,
51 stream_manager,
52 barrier_manager,
53 env,
54 }
55 }
56}
57
58#[async_trait::async_trait]
59impl ScaleService for ScaleServiceImpl {
60 async fn get_cluster_info(
61 &self,
62 _: Request<GetClusterInfoRequest>,
63 ) -> Result<Response<GetClusterInfoResponse>, Status> {
64 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
65
66 let stream_job_fragments = self
67 .metadata_manager
68 .catalog_controller
69 .table_fragments()
70 .await?;
71
72 let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
73 for (_, stream_job_fragments) in stream_job_fragments {
74 let upstreams = self
75 .metadata_manager
76 .catalog_controller
77 .upstream_fragments(stream_job_fragments.fragment_ids())
78 .await?;
79 let dispatchers = self
80 .metadata_manager
81 .catalog_controller
82 .get_fragment_actor_dispatchers(
83 stream_job_fragments
84 .fragment_ids()
85 .map(|id| id as _)
86 .collect(),
87 )
88 .await?;
89 table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
90 }
91
92 let worker_nodes = self
93 .metadata_manager
94 .list_worker_node(Some(WorkerType::ComputeNode), None)
95 .await?;
96
97 let actor_splits = self
98 .env
99 .shared_actor_infos()
100 .list_assignments()
101 .into_iter()
102 .map(|(actor_id, splits)| {
103 (
104 actor_id,
105 ConnectorSplits {
106 splits: splits.iter().map(ConnectorSplit::from).collect(),
107 },
108 )
109 })
110 .collect();
111
112 let sources = self.metadata_manager.list_sources().await?;
113 let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
114
115 Ok(Response::new(GetClusterInfoResponse {
116 worker_nodes,
117 table_fragments,
118 actor_splits,
119 source_infos,
120 revision: 0,
121 }))
122 }
123
124 async fn reschedule(
125 &self,
126 request: Request<RescheduleRequest>,
127 ) -> Result<Response<RescheduleResponse>, Status> {
128 self.barrier_manager.check_status_running()?;
129
130 let RescheduleRequest {
131 worker_reschedules,
132 resolve_no_shuffle_upstream,
133 ..
134 } = request.into_inner();
135
136 let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
137 for (database_id, worker_reschedules) in self
138 .metadata_manager
139 .split_fragment_map_by_database(worker_reschedules)
140 .await?
141 {
142 let streaming_job_ids = self
143 .metadata_manager
144 .catalog_controller
145 .get_fragment_job_id(
146 worker_reschedules
147 .keys()
148 .map(|id| *id as FragmentId)
149 .collect(),
150 )
151 .await?;
152
153 let table_parallelisms = streaming_job_ids
154 .into_iter()
155 .map(|id| (TableId::new(id as _), TableParallelism::Custom))
156 .collect();
157
158 self.stream_manager
159 .reschedule_actors(
160 database_id,
161 JobReschedulePlan {
162 reschedules: worker_reschedules
163 .into_iter()
164 .map(|(fragment_id, reschedule)| {
165 let PbWorkerReschedule { worker_actor_diff } = reschedule;
166 (
167 fragment_id,
168 WorkerReschedule {
169 worker_actor_diff: worker_actor_diff
170 .into_iter()
171 .map(|(worker_id, diff)| (worker_id as _, diff as _))
172 .collect(),
173 },
174 )
175 })
176 .collect(),
177 post_updates: JobReschedulePostUpdates {
178 parallelism_updates: table_parallelisms,
179 resource_group_updates: Default::default(),
180 },
181 },
182 RescheduleOptions {
183 resolve_no_shuffle_upstream,
184 skip_create_new_actors: false,
185 },
186 )
187 .await?;
188 }
189
190 Ok(Response::new(RescheduleResponse {
191 success: true,
192 revision: 0,
193 }))
194 }
195
196 async fn update_streaming_job_node_labels(
197 &self,
198 _request: Request<UpdateStreamingJobNodeLabelsRequest>,
199 ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
200 todo!()
201 }
202
203 async fn get_serverless_streaming_jobs_status(
204 &self,
205 _request: Request<GetServerlessStreamingJobsStatusRequest>,
206 ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
207 todo!()
208 }
209}