risingwave_meta_service/
cluster_service.rs1use itertools::Itertools;
16use risingwave_meta::barrier::BarrierManagerRef;
17use risingwave_meta::manager::MetadataManager;
18use risingwave_pb::common::HostAddress;
19use risingwave_pb::common::worker_node::State;
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22 ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23 AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24 GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25 GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26 UpdateWorkerNodeSchedulabilityRequest, UpdateWorkerNodeSchedulabilityResponse,
27};
28use tonic::{Request, Response, Status};
29
30use crate::MetaError;
31
32#[derive(Clone)]
33pub struct ClusterServiceImpl {
34 metadata_manager: MetadataManager,
35 barrier_manager: BarrierManagerRef,
36}
37
38impl ClusterServiceImpl {
39 pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
40 ClusterServiceImpl {
41 metadata_manager,
42 barrier_manager,
43 }
44 }
45}
46
47#[async_trait::async_trait]
48impl ClusterService for ClusterServiceImpl {
49 async fn add_worker_node(
50 &self,
51 request: Request<AddWorkerNodeRequest>,
52 ) -> Result<Response<AddWorkerNodeResponse>, Status> {
53 let req = request.into_inner();
54 let worker_type = req.get_worker_type()?;
55 let host: HostAddress = req.get_host()?.clone();
56 let property = req
57 .property
58 .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
59 let resource = req.resource.unwrap_or_default();
60 let worker_id = self
61 .metadata_manager
62 .add_worker_node(worker_type, host, property, resource)
63 .await?;
64 let cluster_id = self.metadata_manager.cluster_id().to_string();
65
66 Ok(Response::new(AddWorkerNodeResponse {
67 node_id: Some(worker_id),
68 cluster_id,
69 }))
70 }
71
72 async fn update_worker_node_schedulability(
75 &self,
76 req: Request<UpdateWorkerNodeSchedulabilityRequest>,
77 ) -> Result<Response<UpdateWorkerNodeSchedulabilityResponse>, Status> {
78 let req = req.into_inner();
79 let schedulability = req.get_schedulability()?;
80 let worker_ids = req.worker_ids;
81
82 self.metadata_manager
83 .cluster_controller
84 .update_schedulability(worker_ids.into_iter().map_into().collect(), schedulability)
85 .await?;
86
87 Ok(Response::new(UpdateWorkerNodeSchedulabilityResponse {
88 status: None,
89 }))
90 }
91
92 async fn activate_worker_node(
93 &self,
94 request: Request<ActivateWorkerNodeRequest>,
95 ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
96 let req = request.into_inner();
97 let host = req.get_host()?.clone();
98 #[cfg(not(madsim))]
99 {
100 use risingwave_common::util::addr::try_resolve_dns;
101 use tracing::{error, info};
102 let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
103 error!(e);
104 Status::internal(e)
105 })?;
106 info!(?socket_addr, ?host, "resolve host addr");
107 }
108 self.metadata_manager
109 .cluster_controller
110 .activate_worker(req.node_id)
111 .await?;
112 Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
113 }
114
115 async fn delete_worker_node(
116 &self,
117 request: Request<DeleteWorkerNodeRequest>,
118 ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
119 let req = request.into_inner();
120 let host = req.get_host()?.clone();
121
122 let worker_node = self
123 .metadata_manager
124 .cluster_controller
125 .delete_worker(host)
126 .await?;
127 tracing::info!(
128 host = ?worker_node.host,
129 id = %worker_node.id,
130 r#type = ?worker_node.r#type(),
131 "deleted worker node",
132 );
133
134 Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
135 }
136
137 async fn list_all_nodes(
138 &self,
139 request: Request<ListAllNodesRequest>,
140 ) -> Result<Response<ListAllNodesResponse>, Status> {
141 let req = request.into_inner();
142 let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
143 let worker_states = if req.include_starting_nodes {
144 None
145 } else {
146 Some(State::Running)
147 };
148
149 let node_list = self
150 .metadata_manager
151 .list_worker_node(worker_type, worker_states)
152 .await?;
153 Ok(Response::new(ListAllNodesResponse {
154 status: None,
155 nodes: node_list,
156 }))
157 }
158
159 async fn get_cluster_recovery_status(
160 &self,
161 _request: Request<GetClusterRecoveryStatusRequest>,
162 ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
163 Ok(Response::new(GetClusterRecoveryStatusResponse {
164 status: self.barrier_manager.get_recovery_status() as _,
165 }))
166 }
167
168 async fn get_meta_store_info(
169 &self,
170 _request: Request<GetMetaStoreInfoRequest>,
171 ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
172 Ok(Response::new(GetMetaStoreInfoResponse {
173 meta_store_endpoint: self
174 .metadata_manager
175 .cluster_controller
176 .meta_store_endpoint(),
177 }))
178 }
179}