risingwave_meta_service/
meta_member_service.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use risingwave_common::util::addr::HostAddr;
16use risingwave_meta::rpc::ElectionClientRef;
17use risingwave_pb::common::HostAddress;
18use risingwave_pb::meta::meta_member_service_server::MetaMemberService;
19use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember};
20use tonic::{Request, Response, Status};
21
22#[derive(Clone)]
23pub struct MetaMemberServiceImpl {
24    election_client: ElectionClientRef,
25}
26
27impl MetaMemberServiceImpl {
28    pub fn new(election_client: ElectionClientRef) -> Self {
29        MetaMemberServiceImpl { election_client }
30    }
31}
32
33#[async_trait::async_trait]
34impl MetaMemberService for MetaMemberServiceImpl {
35    async fn members(
36        &self,
37        _request: Request<MembersRequest>,
38    ) -> Result<Response<MembersResponse>, Status> {
39        let mut members = vec![];
40        for member in self.election_client.get_members().await? {
41            let host_addr = member
42                .id
43                .parse::<HostAddr>()
44                .map_err(|err| Status::from_error(err.into()))?;
45            members.push(MetaMember {
46                address: Some(HostAddress {
47                    host: host_addr.host,
48                    port: host_addr.port.into(),
49                }),
50                is_leader: member.is_leader,
51            })
52        }
53
54        Ok(Response::new(MembersResponse { members }))
55    }
56}