risingwave_meta_service/
lib.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15#![feature(let_chains)]
16#![feature(impl_trait_in_assoc_type)]
17#![feature(coverage_attribute)]
18
19use risingwave_meta::*;
20
21pub mod backup_service;
22pub mod cloud_service;
23pub mod cluster_limit_service;
24pub mod cluster_service;
25pub mod ddl_service;
26pub mod event_log_service;
27pub mod health_service;
28pub mod heartbeat_service;
29pub mod hosted_iceberg_catalog_service;
30pub mod hummock_service;
31pub mod meta_member_service;
32pub mod monitor_service;
33pub mod notification_service;
34pub mod scale_service;
35pub mod serving_service;
36pub mod session_config;
37pub mod sink_coordination_service;
38pub mod stream_service;
39pub mod system_params_service;
40pub mod telemetry_service;
41pub mod user_service;
42
43use std::pin::Pin;
44use std::task::{Context, Poll};
45
46use futures::Stream;
47use tokio::sync::mpsc::UnboundedReceiver;
48
49use crate::MetaError;
50
51/// `RwReceiverStream` is a wrapper around `tokio::sync::mpsc::UnboundedReceiver` that implements
52/// Stream. `RwReceiverStream` is similar to `tokio_stream::wrappers::ReceiverStream`, but it
53/// maps Result<S, `MetaError`> to Result<S, `tonic::Status`>.
54pub struct RwReceiverStream<S: Send + Sync + 'static> {
55    inner: UnboundedReceiver<Result<S, MetaError>>,
56}
57
58impl<S: Send + Sync + 'static> RwReceiverStream<S> {
59    pub fn new(inner: UnboundedReceiver<Result<S, MetaError>>) -> Self {
60        Self { inner }
61    }
62}
63
64impl<S: Send + Sync + 'static> Stream for RwReceiverStream<S> {
65    type Item = Result<S, tonic::Status>;
66
67    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
68        self.inner
69            .poll_recv(cx)
70            .map(|opt| opt.map(|res| res.map_err(Into::into)))
71    }
72}
73
74use std::net::SocketAddr;
75
76#[derive(Clone)]
77pub struct AddressInfo {
78    pub advertise_addr: String,
79    pub listen_addr: SocketAddr,
80    pub prometheus_addr: Option<SocketAddr>,
81    pub dashboard_addr: Option<SocketAddr>,
82}
83impl Default for AddressInfo {
84    fn default() -> Self {
85        Self {
86            advertise_addr: "".to_owned(),
87            listen_addr: SocketAddr::V4("127.0.0.1:0000".parse().unwrap()),
88            prometheus_addr: None,
89            dashboard_addr: None,
90        }
91    }
92}