risingwave_meta_service/
scale_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19    JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, ScaleControllerRef,
20    WorkerReschedule,
21};
22use risingwave_meta_model::FragmentId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::meta::scale_service_server::ScaleService;
25use risingwave_pb::meta::{
26    GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
27    GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
28    RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
29};
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use tonic::{Request, Response, Status};
32
33use crate::barrier::BarrierManagerRef;
34use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
35
36pub struct ScaleServiceImpl {
37    metadata_manager: MetadataManager,
38    source_manager: SourceManagerRef,
39    stream_manager: GlobalStreamManagerRef,
40    barrier_manager: BarrierManagerRef,
41}
42
43impl ScaleServiceImpl {
44    pub fn new(
45        metadata_manager: MetadataManager,
46        source_manager: SourceManagerRef,
47        stream_manager: GlobalStreamManagerRef,
48        barrier_manager: BarrierManagerRef,
49        _scale_controller: ScaleControllerRef,
50    ) -> Self {
51        Self {
52            metadata_manager,
53            source_manager,
54            stream_manager,
55            barrier_manager,
56        }
57    }
58}
59
60#[async_trait::async_trait]
61impl ScaleService for ScaleServiceImpl {
62    #[cfg_attr(coverage, coverage(off))]
63    async fn get_cluster_info(
64        &self,
65        _: Request<GetClusterInfoRequest>,
66    ) -> Result<Response<GetClusterInfoResponse>, Status> {
67        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
68
69        let stream_job_fragments = self
70            .metadata_manager
71            .catalog_controller
72            .table_fragments()
73            .await?;
74
75        let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
76        for (_, stream_job_fragments) in stream_job_fragments {
77            let upstreams = self
78                .metadata_manager
79                .catalog_controller
80                .upstream_fragments(stream_job_fragments.fragment_ids())
81                .await?;
82            let dispatchers = self
83                .metadata_manager
84                .catalog_controller
85                .get_fragment_actor_dispatchers(
86                    stream_job_fragments
87                        .fragment_ids()
88                        .map(|id| id as _)
89                        .collect(),
90                )
91                .await?;
92            table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
93        }
94
95        let worker_nodes = self
96            .metadata_manager
97            .list_worker_node(Some(WorkerType::ComputeNode), None)
98            .await?;
99
100        let actor_splits = self
101            .source_manager
102            .list_assignments()
103            .await
104            .into_iter()
105            .map(|(actor_id, splits)| {
106                (
107                    actor_id,
108                    ConnectorSplits {
109                        splits: splits.iter().map(ConnectorSplit::from).collect(),
110                    },
111                )
112            })
113            .collect();
114
115        let sources = self.metadata_manager.list_sources().await?;
116        let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
117
118        Ok(Response::new(GetClusterInfoResponse {
119            worker_nodes,
120            table_fragments,
121            actor_splits,
122            source_infos,
123            revision: 0,
124        }))
125    }
126
127    #[cfg_attr(coverage, coverage(off))]
128    async fn reschedule(
129        &self,
130        request: Request<RescheduleRequest>,
131    ) -> Result<Response<RescheduleResponse>, Status> {
132        self.barrier_manager.check_status_running()?;
133
134        let RescheduleRequest {
135            worker_reschedules,
136            resolve_no_shuffle_upstream,
137            ..
138        } = request.into_inner();
139
140        let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
141        for (database_id, worker_reschedules) in self
142            .metadata_manager
143            .split_fragment_map_by_database(worker_reschedules)
144            .await?
145        {
146            let streaming_job_ids = self
147                .metadata_manager
148                .catalog_controller
149                .get_fragment_job_id(
150                    worker_reschedules
151                        .keys()
152                        .map(|id| *id as FragmentId)
153                        .collect(),
154                )
155                .await?;
156
157            let table_parallelisms = streaming_job_ids
158                .into_iter()
159                .map(|id| (TableId::new(id as _), TableParallelism::Custom))
160                .collect();
161
162            self.stream_manager
163                .reschedule_actors(
164                    database_id,
165                    JobReschedulePlan {
166                        reschedules: worker_reschedules
167                            .into_iter()
168                            .map(|(fragment_id, reschedule)| {
169                                let PbWorkerReschedule { worker_actor_diff } = reschedule;
170                                (
171                                    fragment_id,
172                                    WorkerReschedule {
173                                        worker_actor_diff: worker_actor_diff
174                                            .into_iter()
175                                            .map(|(worker_id, diff)| (worker_id as _, diff as _))
176                                            .collect(),
177                                    },
178                                )
179                            })
180                            .collect(),
181                        post_updates: JobReschedulePostUpdates {
182                            parallelism_updates: table_parallelisms,
183                            resource_group_updates: Default::default(),
184                        },
185                    },
186                    RescheduleOptions {
187                        resolve_no_shuffle_upstream,
188                        skip_create_new_actors: false,
189                    },
190                )
191                .await?;
192        }
193
194        Ok(Response::new(RescheduleResponse {
195            success: true,
196            revision: 0,
197        }))
198    }
199
200    async fn update_streaming_job_node_labels(
201        &self,
202        _request: Request<UpdateStreamingJobNodeLabelsRequest>,
203    ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
204        todo!()
205    }
206
207    async fn get_serverless_streaming_jobs_status(
208        &self,
209        _request: Request<GetServerlessStreamingJobsStatusRequest>,
210    ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
211        todo!()
212    }
213}