risingwave_meta_service/
cluster_service.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::RW_VERSION;
16use risingwave_meta::barrier::BarrierManagerRef;
17use risingwave_meta::manager::MetadataManager;
18use risingwave_pb::common::worker_node::State;
19use risingwave_pb::common::{HostAddress, WorkerType as PbWorkerType};
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22    ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23    AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24    GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25    GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26};
27use tonic::{Request, Response, Status};
28
29use crate::MetaError;
30
31#[derive(Clone)]
32pub struct ClusterServiceImpl {
33    metadata_manager: MetadataManager,
34    barrier_manager: BarrierManagerRef,
35}
36
37impl ClusterServiceImpl {
38    pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
39        ClusterServiceImpl {
40            metadata_manager,
41            barrier_manager,
42        }
43    }
44}
45
46#[async_trait::async_trait]
47impl ClusterService for ClusterServiceImpl {
48    async fn add_worker_node(
49        &self,
50        request: Request<AddWorkerNodeRequest>,
51    ) -> Result<Response<AddWorkerNodeResponse>, Status> {
52        let req = request.into_inner();
53        let worker_type = req.get_worker_type()?;
54        let host: HostAddress = req.get_host()?.clone();
55        let property = req
56            .property
57            .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
58        let resource = req.resource.unwrap_or_default();
59        if matches!(
60            worker_type,
61            PbWorkerType::Frontend | PbWorkerType::ComputeNode | PbWorkerType::Compactor
62        ) && resource.rw_version != RW_VERSION
63        {
64            return Err(MetaError::invalid_parameter(format!(
65                "worker node version {} does not match meta node version {}",
66                resource.rw_version, RW_VERSION,
67            ))
68            .into());
69        }
70        let worker_id = self
71            .metadata_manager
72            .add_worker_node(worker_type, host, property, resource)
73            .await?;
74        let cluster_id = self.metadata_manager.cluster_id().to_string();
75
76        Ok(Response::new(AddWorkerNodeResponse {
77            node_id: Some(worker_id),
78            cluster_id,
79        }))
80    }
81
82    async fn activate_worker_node(
83        &self,
84        request: Request<ActivateWorkerNodeRequest>,
85    ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
86        let req = request.into_inner();
87        let host = req.get_host()?.clone();
88        #[cfg(not(madsim))]
89        {
90            use risingwave_common::util::addr::try_resolve_dns;
91            use tracing::{error, info};
92            let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
93                error!(e);
94                Status::internal(e)
95            })?;
96            info!(?socket_addr, ?host, "resolve host addr");
97        }
98        self.metadata_manager
99            .cluster_controller
100            .activate_worker(req.node_id)
101            .await?;
102        Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
103    }
104
105    async fn delete_worker_node(
106        &self,
107        request: Request<DeleteWorkerNodeRequest>,
108    ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
109        let req = request.into_inner();
110        let host = req.get_host()?.clone();
111
112        let worker_node = self
113            .metadata_manager
114            .cluster_controller
115            .delete_worker(host)
116            .await?;
117        tracing::info!(
118            host = ?worker_node.host,
119            id = %worker_node.id,
120            r#type = ?worker_node.r#type(),
121            "deleted worker node",
122        );
123
124        Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
125    }
126
127    async fn list_all_nodes(
128        &self,
129        request: Request<ListAllNodesRequest>,
130    ) -> Result<Response<ListAllNodesResponse>, Status> {
131        let req = request.into_inner();
132        let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
133        let worker_states = if req.include_starting_nodes {
134            None
135        } else {
136            Some(State::Running)
137        };
138
139        let node_list = self
140            .metadata_manager
141            .list_worker_node(worker_type, worker_states)
142            .await?;
143        Ok(Response::new(ListAllNodesResponse {
144            status: None,
145            nodes: node_list,
146        }))
147    }
148
149    async fn get_cluster_recovery_status(
150        &self,
151        _request: Request<GetClusterRecoveryStatusRequest>,
152    ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
153        Ok(Response::new(GetClusterRecoveryStatusResponse {
154            status: self.barrier_manager.get_recovery_status() as _,
155        }))
156    }
157
158    async fn get_meta_store_info(
159        &self,
160        _request: Request<GetMetaStoreInfoRequest>,
161    ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
162        Ok(Response::new(GetMetaStoreInfoResponse {
163            meta_store_endpoint: self
164                .metadata_manager
165                .cluster_controller
166                .meta_store_endpoint(),
167        }))
168    }
169}