risingwave_common/util/
meta_addr.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::{self, Formatter};
16use std::str::FromStr;
17
18use itertools::Itertools;
19
20const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &str = "load-balance+";
21
22/// The strategy for meta client to connect to meta node.
23///
24/// Used in the command line argument `--meta-address`.
25#[derive(Debug, Eq, PartialEq, Clone)]
26pub enum MetaAddressStrategy {
27    LoadBalance(http::Uri),
28    List(Vec<http::Uri>),
29}
30
31/// Error type for parsing meta address strategy.
32#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
33pub enum MetaAddressStrategyParseError {
34    #[error("empty meta addresses")]
35    Empty,
36    #[error("there should be only one load balance address")]
37    MultipleLoadBalance,
38    #[error("failed to parse meta address `{1}`: {0}")]
39    UrlParse(#[source] http::uri::InvalidUri, String),
40}
41
42impl FromStr for MetaAddressStrategy {
43    type Err = MetaAddressStrategyParseError;
44
45    fn from_str(meta_addr: &str) -> Result<Self, Self::Err> {
46        if let Some(addr) = meta_addr.strip_prefix(META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) {
47            let addr = addr
48                .split(',')
49                .exactly_one()
50                .map_err(|_| MetaAddressStrategyParseError::MultipleLoadBalance)?;
51
52            let uri = addr.parse().into_url_parse(addr)?;
53
54            Ok(Self::LoadBalance(uri))
55        } else {
56            let addrs = meta_addr.split(',').peekable();
57
58            let uris: Vec<_> = addrs
59                .map(|addr| addr.parse().into_url_parse(addr))
60                .try_collect()?;
61
62            if uris.is_empty() {
63                return Err(MetaAddressStrategyParseError::Empty);
64            }
65
66            Ok(Self::List(uris))
67        }
68    }
69}
70
71impl fmt::Display for MetaAddressStrategy {
72    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
73        match self {
74            MetaAddressStrategy::LoadBalance(addr) => {
75                write!(f, "{}{}", META_ADDRESS_LOAD_BALANCE_MODE_PREFIX, addr)?;
76            }
77            MetaAddressStrategy::List(addrs) => {
78                write!(f, "{}", addrs.iter().format(","))?;
79            }
80        }
81        Ok(())
82    }
83}
84
85impl MetaAddressStrategy {
86    /// Returns `Some` if there's exactly one address.
87    pub fn exactly_one(&self) -> Option<&http::Uri> {
88        match self {
89            MetaAddressStrategy::LoadBalance(lb) => Some(lb),
90            MetaAddressStrategy::List(list) => {
91                if list.len() == 1 {
92                    list.first()
93                } else {
94                    None
95                }
96            }
97        }
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use super::*;
104
105    #[test]
106    fn test_parse_meta_addr() {
107        let results = vec![
108            (
109                "load-balance+http://abc",
110                Some(MetaAddressStrategy::LoadBalance(
111                    "http://abc".parse().unwrap(),
112                )),
113            ),
114            ("load-balance+http://abc,http://def", None),
115            ("", None),
116            (
117                "http://abc",
118                Some(MetaAddressStrategy::List(vec![
119                    "http://abc".parse().unwrap(),
120                ])),
121            ),
122            (
123                "http://abc,http://def",
124                Some(MetaAddressStrategy::List(vec![
125                    "http://abc".parse().unwrap(),
126                    "http://def".parse().unwrap(),
127                ])),
128            ),
129        ];
130        for (addr, result) in results {
131            let parsed_result = addr.parse();
132            match result {
133                None => {
134                    assert!(parsed_result.is_err(), "{parsed_result:?}");
135                }
136                Some(strategy) => {
137                    assert_eq!(strategy, parsed_result.unwrap());
138                }
139            }
140        }
141    }
142}