risingwave_meta_service/
heartbeat_service.rs1use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
17use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
18use thiserror_ext::AsReport;
19use tonic::{Request, Response, Status};
20
21#[derive(Clone)]
22pub struct HeartbeatServiceImpl {
23 metadata_manager: MetadataManager,
24}
25
26impl HeartbeatServiceImpl {
27 pub fn new(metadata_manager: MetadataManager) -> Self {
28 HeartbeatServiceImpl { metadata_manager }
29 }
30}
31
32#[async_trait::async_trait]
33impl HeartbeatService for HeartbeatServiceImpl {
34 #[cfg_attr(coverage, coverage(off))]
35 async fn heartbeat(
36 &self,
37 request: Request<HeartbeatRequest>,
38 ) -> Result<Response<HeartbeatResponse>, Status> {
39 let req = request.into_inner();
40 let result = self
41 .metadata_manager
42 .cluster_controller
43 .heartbeat(req.node_id as _)
44 .await;
45
46 match result {
47 Ok(_) => Ok(Response::new(HeartbeatResponse { status: None })),
48 Err(e) => {
49 if e.is_invalid_worker() {
50 return Ok(Response::new(HeartbeatResponse {
51 status: Some(risingwave_pb::common::Status {
52 code: risingwave_pb::common::status::Code::UnknownWorker as i32,
53 message: e.to_report_string(),
54 }),
55 }));
56 }
57 Err(e.into())
58 }
59 }
60 }
61}