risingwave_meta_service/
scale_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::{MetaSrvEnv, MetadataManager};
16use risingwave_pb::common::WorkerType;
17use risingwave_pb::meta::scale_service_server::ScaleService;
18use risingwave_pb::meta::{
19    GetClusterInfoRequest, GetClusterInfoResponse, GetServerlessStreamingJobsStatusRequest,
20    GetServerlessStreamingJobsStatusResponse, RescheduleRequest, RescheduleResponse,
21    UpdateStreamingJobNodeLabelsRequest, UpdateStreamingJobNodeLabelsResponse,
22};
23use risingwave_pb::source::{ConnectorSplit, ConnectorSplits};
24use tonic::{Request, Response, Status};
25
26use crate::barrier::BarrierManagerRef;
27use crate::stream::GlobalStreamManagerRef;
28
29pub struct ScaleServiceImpl {
30    metadata_manager: MetadataManager,
31    stream_manager: GlobalStreamManagerRef,
32    env: MetaSrvEnv,
33}
34
35impl ScaleServiceImpl {
36    pub fn new(
37        metadata_manager: MetadataManager,
38        stream_manager: GlobalStreamManagerRef,
39        _barrier_manager: BarrierManagerRef,
40        env: MetaSrvEnv,
41    ) -> Self {
42        Self {
43            metadata_manager,
44            stream_manager,
45            env,
46        }
47    }
48}
49
50#[async_trait::async_trait]
51impl ScaleService for ScaleServiceImpl {
52    async fn get_cluster_info(
53        &self,
54        _: Request<GetClusterInfoRequest>,
55    ) -> Result<Response<GetClusterInfoResponse>, Status> {
56        let _reschedule_job_lock = self.stream_manager.reschedule_lock_read_guard().await;
57
58        let stream_job_fragments = self
59            .metadata_manager
60            .catalog_controller
61            .table_fragments()
62            .await?;
63
64        let mut table_fragments = Vec::with_capacity(stream_job_fragments.len());
65        for (_, (stream_job_fragments, fragment_actors, actor_status)) in stream_job_fragments {
66            let upstreams = self
67                .metadata_manager
68                .catalog_controller
69                .upstream_fragments(stream_job_fragments.fragment_ids())
70                .await?;
71            let dispatchers = self
72                .metadata_manager
73                .catalog_controller
74                .get_fragment_actor_dispatchers(
75                    stream_job_fragments
76                        .fragment_ids()
77                        .map(|id| id as _)
78                        .collect(),
79                )
80                .await?;
81            table_fragments.push(stream_job_fragments.to_protobuf(
82                &fragment_actors,
83                &upstreams,
84                &dispatchers,
85                actor_status,
86            ))
87        }
88
89        let worker_nodes = self
90            .metadata_manager
91            .list_worker_node(Some(WorkerType::ComputeNode), None)
92            .await?;
93
94        let actor_splits = self
95            .env
96            .shared_actor_infos()
97            .list_assignments()
98            .into_iter()
99            .map(|(actor_id, splits)| {
100                (
101                    actor_id,
102                    ConnectorSplits {
103                        splits: splits.iter().map(ConnectorSplit::from).collect(),
104                    },
105                )
106            })
107            .collect();
108
109        let sources = self.metadata_manager.list_sources().await?;
110        let source_infos = sources.into_iter().map(|s| (s.id, s)).collect();
111
112        Ok(Response::new(GetClusterInfoResponse {
113            worker_nodes,
114            table_fragments,
115            actor_splits,
116            source_infos,
117            revision: 0,
118        }))
119    }
120
121    async fn reschedule(
122        &self,
123        _request: Request<RescheduleRequest>,
124    ) -> Result<Response<RescheduleResponse>, Status> {
125        Ok(Response::new(RescheduleResponse {
126            success: false,
127            revision: 0,
128        }))
129    }
130
131    async fn update_streaming_job_node_labels(
132        &self,
133        _request: Request<UpdateStreamingJobNodeLabelsRequest>,
134    ) -> Result<Response<UpdateStreamingJobNodeLabelsResponse>, Status> {
135        todo!()
136    }
137
138    async fn get_serverless_streaming_jobs_status(
139        &self,
140        _request: Request<GetServerlessStreamingJobsStatusRequest>,
141    ) -> Result<Response<GetServerlessStreamingJobsStatusResponse>, Status> {
142        todo!()
143    }
144}