risingwave_meta_service/
cluster_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::barrier::BarrierManagerRef;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::WorkerId;
18use risingwave_pb::common::HostAddress;
19use risingwave_pb::common::worker_node::State;
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22    ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23    AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24    GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25    GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26    UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse,
27};
28use tonic::{Request, Response, Status};
29
30use crate::MetaError;
31
32#[derive(Clone)]
33pub struct ClusterServiceImpl {
34    metadata_manager: MetadataManager,
35    barrier_manager: BarrierManagerRef,
36}
37
38impl ClusterServiceImpl {
39    pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
40        ClusterServiceImpl {
41            metadata_manager,
42            barrier_manager,
43        }
44    }
45}
46
47#[async_trait::async_trait]
48impl ClusterService for ClusterServiceImpl {
49    async fn add_worker_node(
50        &self,
51        request: Request<AddWorkerNodeRequest>,
52    ) -> Result<Response<AddWorkerNodeResponse>, Status> {
53        let req = request.into_inner();
54        let worker_type = req.get_worker_type()?;
55        let host: HostAddress = req.get_host()?.clone();
56        let property = req
57            .property
58            .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
59        let resource = req.resource.unwrap_or_default();
60        let worker_id = self
61            .metadata_manager
62            .add_worker_node(worker_type, host, property, resource)
63            .await?;
64        let cluster_id = self.metadata_manager.cluster_id().to_string();
65
66        Ok(Response::new(AddWorkerNodeResponse {
67            node_id: Some(worker_id as _),
68            cluster_id,
69        }))
70    }
71
72    /// Update schedulability of a compute node. Will not affect actors which are already running on
73    /// that node, if marked as unschedulable
74    async fn update_worker_node_schedulability(
75        &self,
76        req: Request<UpdateWorkerNodeSchedulabilityRequest>,
77    ) -> Result<Response<UpdateWorkerNodeSchedulabilityResponse>, Status> {
78        let req = req.into_inner();
79        let schedulability = req.get_schedulability()?;
80        let worker_ids = req.worker_ids;
81
82        self.metadata_manager
83            .cluster_controller
84            .update_schedulability(
85                worker_ids.into_iter().map(|id| id as WorkerId).collect(),
86                schedulability,
87            )
88            .await?;
89
90        Ok(Response::new(UpdateWorkerNodeSchedulabilityResponse {
91            status: None,
92        }))
93    }
94
95    async fn activate_worker_node(
96        &self,
97        request: Request<ActivateWorkerNodeRequest>,
98    ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
99        let req = request.into_inner();
100        let host = req.get_host()?.clone();
101        #[cfg(not(madsim))]
102        {
103            use risingwave_common::util::addr::try_resolve_dns;
104            use tracing::{error, info};
105            let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
106                error!(e);
107                Status::internal(e)
108            })?;
109            info!(?socket_addr, ?host, "resolve host addr");
110        }
111        self.metadata_manager
112            .cluster_controller
113            .activate_worker(req.node_id as _)
114            .await?;
115        Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
116    }
117
118    async fn delete_worker_node(
119        &self,
120        request: Request<DeleteWorkerNodeRequest>,
121    ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
122        let req = request.into_inner();
123        let host = req.get_host()?.clone();
124
125        let worker_node = self
126            .metadata_manager
127            .cluster_controller
128            .delete_worker(host)
129            .await?;
130        tracing::info!(
131            host = ?worker_node.host,
132            id = worker_node.id,
133            r#type = ?worker_node.r#type(),
134            "deleted worker node",
135        );
136
137        Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
138    }
139
140    async fn list_all_nodes(
141        &self,
142        request: Request<ListAllNodesRequest>,
143    ) -> Result<Response<ListAllNodesResponse>, Status> {
144        let req = request.into_inner();
145        let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
146        let worker_states = if req.include_starting_nodes {
147            None
148        } else {
149            Some(State::Running)
150        };
151
152        let node_list = self
153            .metadata_manager
154            .list_worker_node(worker_type, worker_states)
155            .await?;
156        Ok(Response::new(ListAllNodesResponse {
157            status: None,
158            nodes: node_list,
159        }))
160    }
161
162    async fn get_cluster_recovery_status(
163        &self,
164        _request: Request<GetClusterRecoveryStatusRequest>,
165    ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
166        Ok(Response::new(GetClusterRecoveryStatusResponse {
167            status: self.barrier_manager.get_recovery_status() as _,
168        }))
169    }
170
171    async fn get_meta_store_info(
172        &self,
173        _request: Request<GetMetaStoreInfoRequest>,
174    ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
175        Ok(Response::new(GetMetaStoreInfoResponse {
176            meta_store_endpoint: self
177                .metadata_manager
178                .cluster_controller
179                .meta_store_endpoint(),
180        }))
181    }
182}