risingwave_meta_service/
heartbeat_service.rs1use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
17use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
18use thiserror_ext::AsReport;
19use tonic::{Request, Response, Status};
20
21#[derive(Clone)]
22pub struct HeartbeatServiceImpl {
23 metadata_manager: MetadataManager,
24}
25
26impl HeartbeatServiceImpl {
27 pub fn new(metadata_manager: MetadataManager) -> Self {
28 HeartbeatServiceImpl { metadata_manager }
29 }
30}
31
32#[async_trait::async_trait]
33impl HeartbeatService for HeartbeatServiceImpl {
34 async fn heartbeat(
35 &self,
36 request: Request<HeartbeatRequest>,
37 ) -> Result<Response<HeartbeatResponse>, Status> {
38 let req = request.into_inner();
39 let result = self
40 .metadata_manager
41 .cluster_controller
42 .heartbeat(req.node_id as _)
43 .await;
44
45 match result {
46 Ok(_) => Ok(Response::new(HeartbeatResponse { status: None })),
47 Err(e) => {
48 if e.is_invalid_worker() {
49 return Ok(Response::new(HeartbeatResponse {
50 status: Some(risingwave_pb::common::Status {
51 code: risingwave_pb::common::status::Code::UnknownWorker as i32,
52 message: e.to_report_string(),
53 }),
54 }));
55 }
56 Err(e.into())
57 }
58 }
59 }
60}