risingwave_meta_service/
heartbeat_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
17use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
18use thiserror_ext::AsReport;
19use tonic::{Request, Response, Status};
20
21#[derive(Clone)]
22pub struct HeartbeatServiceImpl {
23    metadata_manager: MetadataManager,
24}
25
26impl HeartbeatServiceImpl {
27    pub fn new(metadata_manager: MetadataManager) -> Self {
28        HeartbeatServiceImpl { metadata_manager }
29    }
30}
31
32#[async_trait::async_trait]
33impl HeartbeatService for HeartbeatServiceImpl {
34    #[cfg_attr(coverage, coverage(off))]
35    async fn heartbeat(
36        &self,
37        request: Request<HeartbeatRequest>,
38    ) -> Result<Response<HeartbeatResponse>, Status> {
39        let req = request.into_inner();
40        let result = self
41            .metadata_manager
42            .cluster_controller
43            .heartbeat(req.node_id as _)
44            .await;
45
46        match result {
47            Ok(_) => Ok(Response::new(HeartbeatResponse { status: None })),
48            Err(e) => {
49                if e.is_invalid_worker() {
50                    return Ok(Response::new(HeartbeatResponse {
51                        status: Some(risingwave_pb::common::Status {
52                            code: risingwave_pb::common::status::Code::UnknownWorker as i32,
53                            message: e.to_report_string(),
54                        }),
55                    }));
56                }
57                Err(e.into())
58            }
59        }
60    }
61}