risingwave_meta_service/
meta_member_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::addr::HostAddr;
16use risingwave_meta::rpc::ElectionClientRef;
17use risingwave_pb::common::HostAddress;
18use risingwave_pb::meta::meta_member_service_server::MetaMemberService;
19use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember};
20use tonic::{Request, Response, Status};
21
22#[derive(Clone)]
23pub struct MetaMemberServiceImpl {
24    election_client: ElectionClientRef,
25}
26
27impl MetaMemberServiceImpl {
28    pub fn new(election_client: ElectionClientRef) -> Self {
29        MetaMemberServiceImpl { election_client }
30    }
31}
32
33#[async_trait::async_trait]
34impl MetaMemberService for MetaMemberServiceImpl {
35    #[cfg_attr(coverage, coverage(off))]
36    async fn members(
37        &self,
38        _request: Request<MembersRequest>,
39    ) -> Result<Response<MembersResponse>, Status> {
40        let mut members = vec![];
41        for member in self.election_client.get_members().await? {
42            let host_addr = member
43                .id
44                .parse::<HostAddr>()
45                .map_err(|err| Status::from_error(err.into()))?;
46            members.push(MetaMember {
47                address: Some(HostAddress {
48                    host: host_addr.host,
49                    port: host_addr.port.into(),
50                }),
51                is_leader: member.is_leader,
52            })
53        }
54
55        Ok(Response::new(MembersResponse { members }))
56    }
57}