risingwave_meta_service/
cluster_service.rs1use risingwave_common::RW_VERSION;
16use risingwave_meta::barrier::BarrierManagerRef;
17use risingwave_meta::manager::MetadataManager;
18use risingwave_pb::common::worker_node::State;
19use risingwave_pb::common::{HostAddress, WorkerType as PbWorkerType};
20use risingwave_pb::meta::cluster_service_server::ClusterService;
21use risingwave_pb::meta::{
22 ActivateWorkerNodeRequest, ActivateWorkerNodeResponse, AddWorkerNodeRequest,
23 AddWorkerNodeResponse, DeleteWorkerNodeRequest, DeleteWorkerNodeResponse,
24 GetClusterRecoveryStatusRequest, GetClusterRecoveryStatusResponse, GetMetaStoreInfoRequest,
25 GetMetaStoreInfoResponse, ListAllNodesRequest, ListAllNodesResponse,
26};
27use tonic::{Request, Response, Status};
28
29use crate::MetaError;
30
31#[derive(Clone)]
32pub struct ClusterServiceImpl {
33 metadata_manager: MetadataManager,
34 barrier_manager: BarrierManagerRef,
35}
36
37impl ClusterServiceImpl {
38 pub fn new(metadata_manager: MetadataManager, barrier_manager: BarrierManagerRef) -> Self {
39 ClusterServiceImpl {
40 metadata_manager,
41 barrier_manager,
42 }
43 }
44}
45
46#[async_trait::async_trait]
47impl ClusterService for ClusterServiceImpl {
48 async fn add_worker_node(
49 &self,
50 request: Request<AddWorkerNodeRequest>,
51 ) -> Result<Response<AddWorkerNodeResponse>, Status> {
52 let req = request.into_inner();
53 let worker_type = req.get_worker_type()?;
54 let host: HostAddress = req.get_host()?.clone();
55 let property = req
56 .property
57 .ok_or_else(|| MetaError::invalid_parameter("worker node property is not provided"))?;
58 let resource = req.resource.unwrap_or_default();
59 if matches!(
60 worker_type,
61 PbWorkerType::Frontend | PbWorkerType::ComputeNode | PbWorkerType::Compactor
62 ) && resource.rw_version != RW_VERSION
63 {
64 return Err(MetaError::invalid_parameter(format!(
65 "worker node version {} does not match meta node version {}",
66 resource.rw_version, RW_VERSION,
67 ))
68 .into());
69 }
70 let worker_id = self
71 .metadata_manager
72 .add_worker_node(worker_type, host, property, resource)
73 .await?;
74 let cluster_id = self.metadata_manager.cluster_id().to_string();
75
76 Ok(Response::new(AddWorkerNodeResponse {
77 node_id: Some(worker_id),
78 cluster_id,
79 }))
80 }
81
82 async fn activate_worker_node(
83 &self,
84 request: Request<ActivateWorkerNodeRequest>,
85 ) -> Result<Response<ActivateWorkerNodeResponse>, Status> {
86 let req = request.into_inner();
87 let host = req.get_host()?.clone();
88 #[cfg(not(madsim))]
89 {
90 use risingwave_common::util::addr::try_resolve_dns;
91 use tracing::{error, info};
92 let socket_addr = try_resolve_dns(&host.host, host.port).await.map_err(|e| {
93 error!(e);
94 Status::internal(e)
95 })?;
96 info!(?socket_addr, ?host, "resolve host addr");
97 }
98 self.metadata_manager
99 .cluster_controller
100 .activate_worker(req.node_id)
101 .await?;
102 Ok(Response::new(ActivateWorkerNodeResponse { status: None }))
103 }
104
105 async fn delete_worker_node(
106 &self,
107 request: Request<DeleteWorkerNodeRequest>,
108 ) -> Result<Response<DeleteWorkerNodeResponse>, Status> {
109 let req = request.into_inner();
110 let host = req.get_host()?.clone();
111
112 let worker_node = self
113 .metadata_manager
114 .cluster_controller
115 .delete_worker(host)
116 .await?;
117 tracing::info!(
118 host = ?worker_node.host,
119 id = %worker_node.id,
120 r#type = ?worker_node.r#type(),
121 "deleted worker node",
122 );
123
124 Ok(Response::new(DeleteWorkerNodeResponse { status: None }))
125 }
126
127 async fn list_all_nodes(
128 &self,
129 request: Request<ListAllNodesRequest>,
130 ) -> Result<Response<ListAllNodesResponse>, Status> {
131 let req = request.into_inner();
132 let worker_type = req.worker_type.map(|wt| wt.try_into().unwrap());
133 let worker_states = if req.include_starting_nodes {
134 None
135 } else {
136 Some(State::Running)
137 };
138
139 let node_list = self
140 .metadata_manager
141 .list_worker_node(worker_type, worker_states)
142 .await?;
143 Ok(Response::new(ListAllNodesResponse {
144 status: None,
145 nodes: node_list,
146 }))
147 }
148
149 async fn get_cluster_recovery_status(
150 &self,
151 _request: Request<GetClusterRecoveryStatusRequest>,
152 ) -> Result<Response<GetClusterRecoveryStatusResponse>, Status> {
153 Ok(Response::new(GetClusterRecoveryStatusResponse {
154 status: self.barrier_manager.get_recovery_status() as _,
155 }))
156 }
157
158 async fn get_meta_store_info(
159 &self,
160 _request: Request<GetMetaStoreInfoRequest>,
161 ) -> Result<Response<GetMetaStoreInfoResponse>, Status> {
162 Ok(Response::new(GetMetaStoreInfoResponse {
163 meta_store_endpoint: self
164 .metadata_manager
165 .cluster_controller
166 .meta_store_endpoint(),
167 }))
168 }
169}