risingwave_meta_service/
cluster_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use itertools::Itertools;
16use risingwave_meta::barrier::BarrierManagerRef;
17use risingwave_meta::manager::MetadataManager;
18use risingwave_pb::common::HostAddress;
19use risingwave_pb::common::worker_node::State;
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22    ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23    AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24    GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25    GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26    UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse,
27};
28use tonic::{Request, Response, Status};
29
30use crate::MetaError;
31
32#[derive(Clone)]
33pub struct ClusterServiceImpl {
34    metadata_manager: MetadataManager,
35    barrier_manager: BarrierManagerRef,
36}
37
38impl ClusterServiceImpl {
39    pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
40        ClusterServiceImpl {
41            metadata_manager,
42            barrier_manager,
43        }
44    }
45}
46
47#[async_trait::async_trait]
48impl ClusterService for ClusterServiceImpl {
49    async fn add_worker_node(
50        &self,
51        request: Request<AddWorkerNodeRequest>,
52    ) -> Result<Response<AddWorkerNodeResponse>, Status> {
53        let req = request.into_inner();
54        let worker_type = req.get_worker_type()?;
55        let host: HostAddress = req.get_host()?.clone();
56        let property = req
57            .property
58            .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
59        let resource = req.resource.unwrap_or_default();
60        let worker_id = self
61            .metadata_manager
62            .add_worker_node(worker_type, host, property, resource)
63            .await?;
64        let cluster_id = self.metadata_manager.cluster_id().to_string();
65
66        Ok(Response::new(AddWorkerNodeResponse {
67            node_id: Some(worker_id),
68            cluster_id,
69        }))
70    }
71
72    /// Update schedulability of a compute node. Will not affect actors which are already running on
73    /// that node, if marked as unschedulable
74    async fn update_worker_node_schedulability(
75        &self,
76        req: Request<UpdateWorkerNodeSchedulabilityRequest>,
77    ) -> Result<Response<UpdateWorkerNodeSchedulabilityResponse>, Status> {
78        let req = req.into_inner();
79        let schedulability = req.get_schedulability()?;
80        let worker_ids = req.worker_ids;
81
82        self.metadata_manager
83            .cluster_controller
84            .update_schedulability(worker_ids.into_iter().map_into().collect(), schedulability)
85            .await?;
86
87        Ok(Response::new(UpdateWorkerNodeSchedulabilityResponse {
88            status: None,
89        }))
90    }
91
92    async fn activate_worker_node(
93        &self,
94        request: Request<ActivateWorkerNodeRequest>,
95    ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
96        let req = request.into_inner();
97        let host = req.get_host()?.clone();
98        #[cfg(not(madsim))]
99        {
100            use risingwave_common::util::addr::try_resolve_dns;
101            use tracing::{error, info};
102            let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
103                error!(e);
104                Status::internal(e)
105            })?;
106            info!(?socket_addr, ?host, "resolve host addr");
107        }
108        self.metadata_manager
109            .cluster_controller
110            .activate_worker(req.node_id)
111            .await?;
112        Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
113    }
114
115    async fn delete_worker_node(
116        &self,
117        request: Request<DeleteWorkerNodeRequest>,
118    ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
119        let req = request.into_inner();
120        let host = req.get_host()?.clone();
121
122        let worker_node = self
123            .metadata_manager
124            .cluster_controller
125            .delete_worker(host)
126            .await?;
127        tracing::info!(
128            host = ?worker_node.host,
129            id = %worker_node.id,
130            r#type = ?worker_node.r#type(),
131            "deleted worker node",
132        );
133
134        Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
135    }
136
137    async fn list_all_nodes(
138        &self,
139        request: Request<ListAllNodesRequest>,
140    ) -> Result<Response<ListAllNodesResponse>, Status> {
141        let req = request.into_inner();
142        let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
143        let worker_states = if req.include_starting_nodes {
144            None
145        } else {
146            Some(State::Running)
147        };
148
149        let node_list = self
150            .metadata_manager
151            .list_worker_node(worker_type, worker_states)
152            .await?;
153        Ok(Response::new(ListAllNodesResponse {
154            status: None,
155            nodes: node_list,
156        }))
157    }
158
159    async fn get_cluster_recovery_status(
160        &self,
161        _request: Request<GetClusterRecoveryStatusRequest>,
162    ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
163        Ok(Response::new(GetClusterRecoveryStatusResponse {
164            status: self.barrier_manager.get_recovery_status() as _,
165        }))
166    }
167
168    async fn get_meta_store_info(
169        &self,
170        _request: Request<GetMetaStoreInfoRequest>,
171    ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
172        Ok(Response::new(GetMetaStoreInfoResponse {
173            meta_store_endpoint: self
174                .metadata_manager
175                .cluster_controller
176                .meta_store_endpoint(),
177        }))
178    }
179}