risingwave_meta_service/
cluster_service.rs
1use risingwave_meta::barrier::BarrierManagerRef;
16use risingwave_meta::manager::MetadataManager;
17use risingwave_meta_model::WorkerId;
18use risingwave_pb::common::HostAddress;
19use risingwave_pb::common::worker_node::State;
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22 ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23 AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24 GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25 GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26 UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse,
27};
28use tonic::{Request, Response, Status};
29
30use crate::MetaError;
31
32#[derive(Clone)]
33pub struct ClusterServiceImpl {
34 metadata_manager: MetadataManager,
35 barrier_manager: BarrierManagerRef,
36}
37
38impl ClusterServiceImpl {
39 pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
40 ClusterServiceImpl {
41 metadata_manager,
42 barrier_manager,
43 }
44 }
45}
46
47#[async_trait::async_trait]
48impl ClusterService for ClusterServiceImpl {
49 async fn add_worker_node(
50 &self,
51 request: Request<AddWorkerNodeRequest>,
52 ) -> Result<Response<AddWorkerNodeResponse>, Status> {
53 let req = request.into_inner();
54 let worker_type = req.get_worker_type()?;
55 let host: HostAddress = req.get_host()?.clone();
56 let property = req
57 .property
58 .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
59 let resource = req.resource.unwrap_or_default();
60 let worker_id = self
61 .metadata_manager
62 .add_worker_node(worker_type, host, property, resource)
63 .await?;
64 let cluster_id = self.metadata_manager.cluster_id().to_string();
65
66 Ok(Response::new(AddWorkerNodeResponse {
67 node_id: Some(worker_id as _),
68 cluster_id,
69 }))
70 }
71
72 async fn update_worker_node_schedulability(
75 &self,
76 req: Request<UpdateWorkerNodeSchedulabilityRequest>,
77 ) -> Result<Response<UpdateWorkerNodeSchedulabilityResponse>, Status> {
78 let req = req.into_inner();
79 let schedulability = req.get_schedulability()?;
80 let worker_ids = req.worker_ids;
81
82 self.metadata_manager
83 .cluster_controller
84 .update_schedulability(
85 worker_ids.into_iter().map(|id| id as WorkerId).collect(),
86 schedulability,
87 )
88 .await?;
89
90 Ok(Response::new(UpdateWorkerNodeSchedulabilityResponse {
91 status: None,
92 }))
93 }
94
95 async fn activate_worker_node(
96 &self,
97 request: Request<ActivateWorkerNodeRequest>,
98 ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
99 let req = request.into_inner();
100 let host = req.get_host()?.clone();
101 #[cfg(not(madsim))]
102 {
103 use risingwave_common::util::addr::try_resolve_dns;
104 use tracing::{error, info};
105 let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
106 error!(e);
107 Status::internal(e)
108 })?;
109 info!(?socket_addr, ?host, "resolve host addr");
110 }
111 self.metadata_manager
112 .cluster_controller
113 .activate_worker(req.node_id as _)
114 .await?;
115 Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
116 }
117
118 async fn delete_worker_node(
119 &self,
120 request: Request<DeleteWorkerNodeRequest>,
121 ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
122 let req = request.into_inner();
123 let host = req.get_host()?.clone();
124
125 let worker_node = self
126 .metadata_manager
127 .cluster_controller
128 .delete_worker(host)
129 .await?;
130 tracing::info!(
131 host = ?worker_node.host,
132 id = worker_node.id,
133 r#type = ?worker_node.r#type(),
134 "deleted worker node",
135 );
136
137 Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
138 }
139
140 async fn list_all_nodes(
141 &self,
142 request: Request<ListAllNodesRequest>,
143 ) -> Result<Response<ListAllNodesResponse>, Status> {
144 let req = request.into_inner();
145 let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
146 let worker_states = if req.include_starting_nodes {
147 None
148 } else {
149 Some(State::Running)
150 };
151
152 let node_list = self
153 .metadata_manager
154 .list_worker_node(worker_type, worker_states)
155 .await?;
156 Ok(Response::new(ListAllNodesResponse {
157 status: None,
158 nodes: node_list,
159 }))
160 }
161
162 async fn get_cluster_recovery_status(
163 &self,
164 _request: Request<GetClusterRecoveryStatusRequest>,
165 ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
166 Ok(Response::new(GetClusterRecoveryStatusResponse {
167 status: self.barrier_manager.get_recovery_status() as _,
168 }))
169 }
170
171 async fn get_meta_store_info(
172 &self,
173 _request: Request<GetMetaStoreInfoRequest>,
174 ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
175 Ok(Response::new(GetMetaStoreInfoResponse {
176 meta_store_endpoint: self
177 .metadata_manager
178 .cluster_controller
179 .meta_store_endpoint(),
180 }))
181 }
182}