risingwave_meta_service/
meta_member_service.rs1use risingwave_common::util::addr::HostAddr;
16use risingwave_meta::rpc::ElectionClientRef;
17use risingwave_pb::common::HostAddress;
18use risingwave_pb::meta::meta_member_service_server::MetaMemberService;
19use risingwave_pb::meta::{MembersRequest, MembersResponse, MetaMember};
20use tonic::{Request, Response, Status};
21
22#[derive(Clone)]
23pub struct MetaMemberServiceImpl {
24 election_client: ElectionClientRef,
25}
26
27impl MetaMemberServiceImpl {
28 pub fn new(election_client: ElectionClientRef) -> Self {
29 MetaMemberServiceImpl { election_client }
30 }
31}
32
33#[async_trait::async_trait]
34impl MetaMemberService for MetaMemberServiceImpl {
35 #[cfg_attr(coverage, coverage(off))]
36 async fn members(
37 &self,
38 _request: Request<MembersRequest>,
39 ) -> Result<Response<MembersResponse>, Status> {
40 let mut members = vec![];
41 for member in self.election_client.get_members().await? {
42 let host_addr = member
43 .id
44 .parse::<HostAddr>()
45 .map_err(|err| Status::from_error(err.into()))?;
46 members.push(MetaMember {
47 address: Some(HostAddress {
48 host: host_addr.host,
49 port: host_addr.port.into(),
50 }),
51 is_leader: member.is_leader,
52 })
53 }
54
55 Ok(Response::new(MembersResponse { members }))
56 }
57}