risingwave_meta_service/
scale_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::catalog::TableId;
16use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
17use risingwave_meta::model::TableParallelism;
18use risingwave_meta::stream::{
19    JobReschedulePlan, JobReschedulePostUpdates, RescheduleOptions, WorkerReschedule,
20};
21use risingwave_meta_model::FragmentId;
22use risingwave_pb::common::WorkerType;
23use risingwave_pb::meta::scale_service_server::ScaleService;
24use risingwave_pb::meta::{
25    GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
26    GetServerlessStreamingJobsStatusResponse, PbWorkerReschedule, RescheduleRequest,
27    RescheduleResponse, UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
28};
29use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
30use tonic::{Request, Response, Status};
31
32use crate::barrier::BarrierManagerRef;
33use crate::stream::GlobalStreamManagerRef;
34
35pub struct ScaleServiceImpl {
36    metadata_manager: MetadataManager,
37    stream_manager: GlobalStreamManagerRef,
38    barrier_manager: BarrierManagerRef,
39    env: MetaSrvEnv,
40}
41
42impl ScaleServiceImpl {
43    pub fn new(
44        metadata_manager: MetadataManager,
45        stream_manager: GlobalStreamManagerRef,
46        barrier_manager: BarrierManagerRef,
47        env: MetaSrvEnv,
48    ) -> Self {
49        Self {
50            metadata_manager,
51            stream_manager,
52            barrier_manager,
53            env,
54        }
55    }
56}
57
58#[async_trait::async_trait]
59impl ScaleService for ScaleServiceImpl {
60    async fn get_cluster_info(
61        &self,
62        _: Request<GetClusterInfoRequest>,
63    ) -> Result<Response<GetClusterInfoResponse>, Status> {
64        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
65
66        let stream_job_fragments = self
67            .metadata_manager
68            .catalog_controller
69            .table_fragments()
70            .await?;
71
72        let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
73        for (_, stream_job_fragments) in stream_job_fragments {
74            let upstreams = self
75                .metadata_manager
76                .catalog_controller
77                .upstream_fragments(stream_job_fragments.fragment_ids())
78                .await?;
79            let dispatchers = self
80                .metadata_manager
81                .catalog_controller
82                .get_fragment_actor_dispatchers(
83                    stream_job_fragments
84                        .fragment_ids()
85                        .map(|id| id as _)
86                        .collect(),
87                )
88                .await?;
89            table_fragments.push(stream_job_fragments.to_protobuf(&upstreams, &dispatchers))
90        }
91
92        let worker_nodes = self
93            .metadata_manager
94            .list_worker_node(Some(WorkerType::ComputeNode), None)
95            .await?;
96
97        let actor_splits = self
98            .env
99            .shared_actor_infos()
100            .list_assignments()
101            .into_iter()
102            .map(|(actor_id, splits)| {
103                (
104                    actor_id,
105                    ConnectorSplits {
106                        splits: splits.iter().map(ConnectorSplit::from).collect(),
107                    },
108                )
109            })
110            .collect();
111
112        let sources = self.metadata_manager.list_sources().await?;
113        let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
114
115        Ok(Response::new(GetClusterInfoResponse {
116            worker_nodes,
117            table_fragments,
118            actor_splits,
119            source_infos,
120            revision: 0,
121        }))
122    }
123
124    async fn reschedule(
125        &self,
126        request: Request<RescheduleRequest>,
127    ) -> Result<Response<RescheduleResponse>, Status> {
128        self.barrier_manager.check_status_running()?;
129
130        let RescheduleRequest {
131            worker_reschedules,
132            resolve_no_shuffle_upstream,
133            ..
134        } = request.into_inner();
135
136        let _reschedule_job_lock = self.stream_manager.reschedule_lock_write_guard().await;
137        for (database_id, worker_reschedules) in self
138            .metadata_manager
139            .split_fragment_map_by_database(worker_reschedules)
140            .await?
141        {
142            let streaming_job_ids = self
143                .metadata_manager
144                .catalog_controller
145                .get_fragment_job_id(
146                    worker_reschedules
147                        .keys()
148                        .map(|id| *id as FragmentId)
149                        .collect(),
150                )
151                .await?;
152
153            let table_parallelisms = streaming_job_ids
154                .into_iter()
155                .map(|id| (TableId::new(id as _), TableParallelism::Custom))
156                .collect();
157
158            self.stream_manager
159                .reschedule_actors(
160                    database_id,
161                    JobReschedulePlan {
162                        reschedules: worker_reschedules
163                            .into_iter()
164                            .map(|(fragment_id, reschedule)| {
165                                let PbWorkerReschedule { worker_actor_diff } = reschedule;
166                                (
167                                    fragment_id,
168                                    WorkerReschedule {
169                                        worker_actor_diff: worker_actor_diff
170                                            .into_iter()
171                                            .map(|(worker_id, diff)| (worker_id as _, diff as _))
172                                            .collect(),
173                                    },
174                                )
175                            })
176                            .collect(),
177                        post_updates: JobReschedulePostUpdates {
178                            parallelism_updates: table_parallelisms,
179                            resource_group_updates: Default::default(),
180                        },
181                    },
182                    RescheduleOptions {
183                        resolve_no_shuffle_upstream,
184                        skip_create_new_actors: false,
185                    },
186                )
187                .await?;
188        }
189
190        Ok(Response::new(RescheduleResponse {
191            success: true,
192            revision: 0,
193        }))
194    }
195
196    async fn update_streaming_job_node_labels(
197        &self,
198        _request: Request<UpdateStreamingJobNodeLabelsRequest>,
199    ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
200        todo!()
201    }
202
203    async fn get_serverless_streaming_jobs_status(
204        &self,
205        _request: Request<GetServerlessStreamingJobsStatusRequest>,
206    ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
207        todo!()
208    }
209}