risingwave_meta_service/
scale_service.rs
1use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19 JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, ScaleControllerRef,
20 WorkerReschedule,
21};
22use risingwave_meta_model::FragmentId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::meta::scale_service_server::ScaleService;
25use risingwave_pb::meta::{
26 GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
27 GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
28 RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
29};
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use tonic::{Request, Response, Status};
32
33use crate::barrier::BarrierManagerRef;
34use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
35
36pub struct ScaleServiceImpl {
37 metadata_manager: MetadataManager,
38 source_manager: SourceManagerRef,
39 stream_manager: GlobalStreamManagerRef,
40 barrier_manager: BarrierManagerRef,
41}
42
43impl ScaleServiceImpl {
44 pub fn new(
45 metadata_manager: MetadataManager,
46 source_manager: SourceManagerRef,
47 stream_manager: GlobalStreamManagerRef,
48 barrier_manager: BarrierManagerRef,
49 _scale_controller: ScaleControllerRef,
50 ) -> Self {
51 Self {
52 metadata_manager,
53 source_manager,
54 stream_manager,
55 barrier_manager,
56 }
57 }
58}
59
60#[async_trait::async_trait]
61impl ScaleService for ScaleServiceImpl {
62 #[cfg_attr(coverage, coverage(off))]
63 async fn get_cluster_info(
64 &self,
65 _: Request<GetClusterInfoRequest>,
66 ) -> Result<Response<GetClusterInfoResponse>, Status> {
67 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
68
69 let stream_job_fragments = self
70 .metadata_manager
71 .catalog_controller
72 .table_fragments()
73 .await?;
74
75 let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
76 for (_, stream_job_fragments) in stream_job_fragments {
77 let upstreams = self
78 .metadata_manager
79 .catalog_controller
80 .upstream_fragments(stream_job_fragments.fragment_ids())
81 .await?;
82 let dispatchers = self
83 .metadata_manager
84 .catalog_controller
85 .get_fragment_actor_dispatchers(
86 stream_job_fragments
87 .fragment_ids()
88 .map(|id| id as _)
89 .collect(),
90 )
91 .await?;
92 table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
93 }
94
95 let worker_nodes = self
96 .metadata_manager
97 .list_worker_node(Some(WorkerType::ComputeNode), None)
98 .await?;
99
100 let actor_splits = self
101 .source_manager
102 .list_assignments()
103 .await
104 .into_iter()
105 .map(|(actor_id, splits)| {
106 (
107 actor_id,
108 ConnectorSplits {
109 splits: splits.iter().map(ConnectorSplit::from).collect(),
110 },
111 )
112 })
113 .collect();
114
115 let sources = self.metadata_manager.list_sources().await?;
116 let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
117
118 Ok(Response::new(GetClusterInfoResponse {
119 worker_nodes,
120 table_fragments,
121 actor_splits,
122 source_infos,
123 revision: 0,
124 }))
125 }
126
127 #[cfg_attr(coverage, coverage(off))]
128 async fn reschedule(
129 &self,
130 request: Request<RescheduleRequest>,
131 ) -> Result<Response<RescheduleResponse>, Status> {
132 self.barrier_manager.check_status_running()?;
133
134 let RescheduleRequest {
135 worker_reschedules,
136 resolve_no_shuffle_upstream,
137 ..
138 } = request.into_inner();
139
140 let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
141 for (database_id, worker_reschedules) in self
142 .metadata_manager
143 .split_fragment_map_by_database(worker_reschedules)
144 .await?
145 {
146 let streaming_job_ids = self
147 .metadata_manager
148 .catalog_controller
149 .get_fragment_job_id(
150 worker_reschedules
151 .keys()
152 .map(|id| *id as FragmentId)
153 .collect(),
154 )
155 .await?;
156
157 let table_parallelisms = streaming_job_ids
158 .into_iter()
159 .map(|id| (TableId::new(id as _), TableParallelism::Custom))
160 .collect();
161
162 self.stream_manager
163 .reschedule_actors(
164 database_id,
165 JobReschedulePlan {
166 reschedules: worker_reschedules
167 .into_iter()
168 .map(|(fragment_id, reschedule)| {
169 let PbWorkerReschedule { worker_actor_diff } = reschedule;
170 (
171 fragment_id,
172 WorkerReschedule {
173 worker_actor_diff: worker_actor_diff
174 .into_iter()
175 .map(|(worker_id, diff)| (worker_id as _, diff as _))
176 .collect(),
177 },
178 )
179 })
180 .collect(),
181 post_updates: JobReschedulePostUpdates {
182 parallelism_updates: table_parallelisms,
183 resource_group_updates: Default::default(),
184 },
185 },
186 RescheduleOptions {
187 resolve_no_shuffle_upstream,
188 skip_create_new_actors: false,
189 },
190 )
191 .await?;
192 }
193
194 Ok(Response::new(RescheduleResponse {
195 success: true,
196 revision: 0,
197 }))
198 }
199
200 async fn update_streaming_job_node_labels(
201 &self,
202 _request: Request<UpdateStreamingJobNodeLabelsRequest>,
203 ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
204 todo!()
205 }
206
207 async fn get_serverless_streaming_jobs_status(
208 &self,
209 _request: Request<GetServerlessStreamingJobsStatusRequest>,
210 ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
211 todo!()
212 }
213}