risingwave_meta_service/
scale_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19    JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, ScaleControllerRef,
20    WorkerReschedule,
21};
22use risingwave_meta_model::FragmentId;
23use risingwave_pb::common::WorkerType;
24use risingwave_pb::meta::scale_service_server::ScaleService;
25use risingwave_pb::meta::{
26    GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
27    GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
28    RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
29};
30use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
31use tonic::{Request, Response, Status};
32
33use crate::barrier::BarrierManagerRef;
34use crate::stream::{GlobalStreamManagerRef, SourceManagerRef};
35
36pub struct ScaleServiceImpl {
37    metadata_manager: MetadataManager,
38    source_manager: SourceManagerRef,
39    stream_manager: GlobalStreamManagerRef,
40    barrier_manager: BarrierManagerRef,
41}
42
43impl ScaleServiceImpl {
44    pub fn new(
45        metadata_manager: MetadataManager,
46        source_manager: SourceManagerRef,
47        stream_manager: GlobalStreamManagerRef,
48        barrier_manager: BarrierManagerRef,
49        _scale_controller: ScaleControllerRef,
50    ) -> Self {
51        Self {
52            metadata_manager,
53            source_manager,
54            stream_manager,
55            barrier_manager,
56        }
57    }
58}
59
60#[async_trait::async_trait]
61impl ScaleService for ScaleServiceImpl {
62    async fn get_cluster_info(
63        &self,
64        _: Request<GetClusterInfoRequest>,
65    ) -> Result<Response<GetClusterInfoResponse>, Status> {
66        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
67
68        let stream_job_fragments = self
69            .metadata_manager
70            .catalog_controller
71            .table_fragments()
72            .await?;
73
74        let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
75        for (_, stream_job_fragments) in stream_job_fragments {
76            let upstreams = self
77                .metadata_manager
78                .catalog_controller
79                .upstream_fragments(stream_job_fragments.fragment_ids())
80                .await?;
81            let dispatchers = self
82                .metadata_manager
83                .catalog_controller
84                .get_fragment_actor_dispatchers(
85                    stream_job_fragments
86                        .fragment_ids()
87                        .map(|id| id as _)
88                        .collect(),
89                )
90                .await?;
91            table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
92        }
93
94        let worker_nodes = self
95            .metadata_manager
96            .list_worker_node(Some(WorkerType::ComputeNode), None)
97            .await?;
98
99        let actor_splits = self
100            .source_manager
101            .list_assignments()
102            .await
103            .into_iter()
104            .map(|(actor_id, splits)| {
105                (
106                    actor_id,
107                    ConnectorSplits {
108                        splits: splits.iter().map(ConnectorSplit::from).collect(),
109                    },
110                )
111            })
112            .collect();
113
114        let sources = self.metadata_manager.list_sources().await?;
115        let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
116
117        Ok(Response::new(GetClusterInfoResponse {
118            worker_nodes,
119            table_fragments,
120            actor_splits,
121            source_infos,
122            revision: 0,
123        }))
124    }
125
126    async fn reschedule(
127        &self,
128        request: Request<RescheduleRequest>,
129    ) -> Result<Response<RescheduleResponse>, Status> {
130        self.barrier_manager.check_status_running()?;
131
132        let RescheduleRequest {
133            worker_reschedules,
134            resolve_no_shuffle_upstream,
135            ..
136        } = request.into_inner();
137
138        let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
139        for (database_id, worker_reschedules) in self
140            .metadata_manager
141            .split_fragment_map_by_database(worker_reschedules)
142            .await?
143        {
144            let streaming_job_ids = self
145                .metadata_manager
146                .catalog_controller
147                .get_fragment_job_id(
148                    worker_reschedules
149                        .keys()
150                        .map(|id| *id as FragmentId)
151                        .collect(),
152                )
153                .await?;
154
155            let table_parallelisms = streaming_job_ids
156                .into_iter()
157                .map(|id| (TableId::new(id as _), TableParallelism::Custom))
158                .collect();
159
160            self.stream_manager
161                .reschedule_actors(
162                    database_id,
163                    JobReschedulePlan {
164                        reschedules: worker_reschedules
165                            .into_iter()
166                            .map(|(fragment_id, reschedule)| {
167                                let PbWorkerReschedule { worker_actor_diff } = reschedule;
168                                (
169                                    fragment_id,
170                                    WorkerReschedule {
171                                        worker_actor_diff: worker_actor_diff
172                                            .into_iter()
173                                            .map(|(worker_id, diff)| (worker_id as _, diff as _))
174                                            .collect(),
175                                    },
176                                )
177                            })
178                            .collect(),
179                        post_updates: JobReschedulePostUpdates {
180                            parallelism_updates: table_parallelisms,
181                            resource_group_updates: Default::default(),
182                        },
183                    },
184                    RescheduleOptions {
185                        resolve_no_shuffle_upstream,
186                        skip_create_new_actors: false,
187                    },
188                )
189                .await?;
190        }
191
192        Ok(Response::new(RescheduleResponse {
193            success: true,
194            revision: 0,
195        }))
196    }
197
198    async fn update_streaming_job_node_labels(
199        &self,
200        _request: Request<UpdateStreamingJobNodeLabelsRequest>,
201    ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
202        todo!()
203    }
204
205    async fn get_serverless_streaming_jobs_status(
206        &self,
207        _request: Request<GetServerlessStreamingJobsStatusRequest>,
208    ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
209        todo!()
210    }
211}