risingwave_meta_service/
heartbeat_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_meta::manager::MetadataManager;
16use risingwave_pb::meta::heartbeat_service_server::HeartbeatService;
17use risingwave_pb::meta::{HeartbeatRequest, HeartbeatResponse};
18use thiserror_ext::AsReport;
19use tonic::{Request, Response, Status};
20
21#[derive(Clone)]
22pub struct HeartbeatServiceImpl {
23    metadata_manager: MetadataManager,
24}
25
26impl HeartbeatServiceImpl {
27    pub fn new(metadata_manager: MetadataManager) -> Self {
28        HeartbeatServiceImpl { metadata_manager }
29    }
30}
31
32#[async_trait::async_trait]
33impl HeartbeatService for HeartbeatServiceImpl {
34    async fn heartbeat(
35        &self,
36        request: Request<HeartbeatRequest>,
37    ) -> Result<Response<HeartbeatResponse>, Status> {
38        let req = request.into_inner();
39        let result = self
40            .metadata_manager
41            .cluster_controller
42            .heartbeat(req.node_id as _)
43            .await;
44
45        match result {
46            Ok(_) => Ok(Response::new(HeartbeatResponse { status: None })),
47            Err(e) => {
48                if e.is_invalid_worker() {
49                    return Ok(Response::new(HeartbeatResponse {
50                        status: Some(risingwave_pb::common::Status {
51                            code: risingwave_pb::common::status::Code::UnknownWorker as i32,
52                            message: e.to_report_string(),
53                        }),
54                    }));
55                }
56                Err(e.into())
57            }
58        }
59    }
60}