risingwave_meta_service/
scale_service.rs1use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19 JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, ScaleControllerRef,
20 WorkerReschedule,
21};
22use risingwave_meta_model::FragmentId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::meta::scale_service_server::ScaleService;
25use risingwave_pb::meta::{
26 GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
27 GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
28 RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
29};
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use tonic::{Request, Response, Status};
32
33use crate::barrier::BarrierManagerRef;
34use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
35
36pub struct ScaleServiceImpl {
37 metadata_manager: MetadataManager,
38 source_manager: SourceManagerRef,
39 stream_manager: GlobalStreamManagerRef,
40 barrier_manager: BarrierManagerRef,
41}
42
43impl ScaleServiceImpl {
44 pub fn new(
45 metadata_manager: MetadataManager,
46 source_manager: SourceManagerRef,
47 stream_manager: GlobalStreamManagerRef,
48 barrier_manager: BarrierManagerRef,
49 _scale_controller: ScaleControllerRef,
50 ) -> Self {
51 Self {
52 metadata_manager,
53 source_manager,
54 stream_manager,
55 barrier_manager,
56 }
57 }
58}
59
60#[async_trait::async_trait]
61impl ScaleService for ScaleServiceImpl {
62 async fn get_cluster_info(
63 &self,
64 _: Request<GetClusterInfoRequest>,
65 ) -> Result<Response<GetClusterInfoResponse>, Status> {
66 let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
67
68 let stream_job_fragments = self
69 .metadata_manager
70 .catalog_controller
71 .table_fragments()
72 .await?;
73
74 let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
75 for (_, stream_job_fragments) in stream_job_fragments {
76 let upstreams = self
77 .metadata_manager
78 .catalog_controller
79 .upstream_fragments(stream_job_fragments.fragment_ids())
80 .await?;
81 let dispatchers = self
82 .metadata_manager
83 .catalog_controller
84 .get_fragment_actor_dispatchers(
85 stream_job_fragments
86 .fragment_ids()
87 .map(|id| id as _)
88 .collect(),
89 )
90 .await?;
91 table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
92 }
93
94 let worker_nodes = self
95 .metadata_manager
96 .list_worker_node(Some(WorkerType::ComputeNode), None)
97 .await?;
98
99 let actor_splits = self
100 .source_manager
101 .list_assignments()
102 .await
103 .into_iter()
104 .map(|(actor_id, splits)| {
105 (
106 actor_id,
107 ConnectorSplits {
108 splits: splits.iter().map(ConnectorSplit::from).collect(),
109 },
110 )
111 })
112 .collect();
113
114 let sources = self.metadata_manager.list_sources().await?;
115 let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
116
117 Ok(Response::new(GetClusterInfoResponse {
118 worker_nodes,
119 table_fragments,
120 actor_splits,
121 source_infos,
122 revision: 0,
123 }))
124 }
125
126 async fn reschedule(
127 &self,
128 request: Request<RescheduleRequest>,
129 ) -> Result<Response<RescheduleResponse>, Status> {
130 self.barrier_manager.check_status_running()?;
131
132 let RescheduleRequest {
133 worker_reschedules,
134 resolve_no_shuffle_upstream,
135 ..
136 } = request.into_inner();
137
138 let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
139 for (database_id, worker_reschedules) in self
140 .metadata_manager
141 .split_fragment_map_by_database(worker_reschedules)
142 .await?
143 {
144 let streaming_job_ids = self
145 .metadata_manager
146 .catalog_controller
147 .get_fragment_job_id(
148 worker_reschedules
149 .keys()
150 .map(|id| *id as FragmentId)
151 .collect(),
152 )
153 .await?;
154
155 let table_parallelisms = streaming_job_ids
156 .into_iter()
157 .map(|id| (TableId::new(id as _), TableParallelism::Custom))
158 .collect();
159
160 self.stream_manager
161 .reschedule_actors(
162 database_id,
163 JobReschedulePlan {
164 reschedules: worker_reschedules
165 .into_iter()
166 .map(|(fragment_id, reschedule)| {
167 let PbWorkerReschedule { worker_actor_diff } = reschedule;
168 (
169 fragment_id,
170 WorkerReschedule {
171 worker_actor_diff: worker_actor_diff
172 .into_iter()
173 .map(|(worker_id, diff)| (worker_id as _, diff as _))
174 .collect(),
175 },
176 )
177 })
178 .collect(),
179 post_updates: JobReschedulePostUpdates {
180 parallelism_updates: table_parallelisms,
181 resource_group_updates: Default::default(),
182 },
183 },
184 RescheduleOptions {
185 resolve_no_shuffle_upstream,
186 skip_create_new_actors: false,
187 },
188 )
189 .await?;
190 }
191
192 Ok(Response::new(RescheduleResponse {
193 success: true,
194 revision: 0,
195 }))
196 }
197
198 async fn update_streaming_job_node_labels(
199 &self,
200 _request: Request<UpdateStreamingJobNodeLabelsRequest>,
201 ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
202 todo!()
203 }
204
205 async fn get_serverless_streaming_jobs_status(
206 &self,
207 _request: Request<GetServerlessStreamingJobsStatusRequest>,
208 ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
209 todo!()
210 }
211}