risingwave_meta_service/
meta_member_service.rs1use risingwave_common::util::addr::HostAddr;
16use risingwave_meta::rpc::ElectionClientRef;
17use risingwave_pb::common::HostAddress;
18use risingwave_pb::meta::meta_member_service_server::MetaMemberService;
19use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember};
20use tonic::{Request, Response, Status};
21
22#[derive(Clone)]
23pub struct MetaMemberServiceImpl {
24 election_client: ElectionClientRef,
25}
26
27impl MetaMemberServiceImpl {
28 pub fn new(election_client: ElectionClientRef) -> Self {
29 MetaMemberServiceImpl { election_client }
30 }
31}
32
33#[async_trait::async_trait]
34impl MetaMemberService for MetaMemberServiceImpl {
35 async fn members(
36 &self,
37 _request: Request<MembersRequest>,
38 ) -> Result<Response<MembersResponse>, Status> {
39 let mut members = vec![];
40 for member in self.election_client.get_members().await? {
41 let host_addr = member
42 .id
43 .parse::<HostAddr>()
44 .map_err(|err| Status::from_error(err.into()))?;
45 members.push(MetaMember {
46 address: Some(HostAddress {
47 host: host_addr.host,
48 port: host_addr.port.into(),
49 }),
50 is_leader: member.is_leader,
51 })
52 }
53
54 Ok(Response::new(MembersResponse { members }))
55 }
56}