risingwave_common/util/
meta_addr.rs1use std::fmt::{self, Formatter};
16use std::str::FromStr;
17
18use itertools::Itertools;
19
20const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &str = "load-balance+";
21
22#[derive(Debug, Eq, PartialEq, Clone)]
26pub enum MetaAddressStrategy {
27 LoadBalance(http::Uri),
28 List(Vec<http::Uri>),
29}
30
31#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
33pub enum MetaAddressStrategyParseError {
34 #[error("empty meta addresses")]
35 Empty,
36 #[error("there should be only one load balance address")]
37 MultipleLoadBalance,
38 #[error("failed to parse meta address `{1}`: {0}")]
39 UrlParse(#[source] http::uri::InvalidUri, String),
40}
41
42impl FromStr for MetaAddressStrategy {
43 type Err = MetaAddressStrategyParseError;
44
45 fn from_str(meta_addr: &str) -> Result<Self, Self::Err> {
46 if let Some(addr) = meta_addr.strip_prefix(META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) {
47 let addr = addr
48 .split(',')
49 .exactly_one()
50 .map_err(|_| MetaAddressStrategyParseError::MultipleLoadBalance)?;
51
52 let uri = addr.parse().into_url_parse(addr)?;
53
54 Ok(Self::LoadBalance(uri))
55 } else {
56 let addrs = meta_addr.split(',').peekable();
57
58 let uris: Vec<_> = addrs
59 .map(|addr| addr.parse().into_url_parse(addr))
60 .try_collect()?;
61
62 if uris.is_empty() {
63 return Err(MetaAddressStrategyParseError::Empty);
64 }
65
66 Ok(Self::List(uris))
67 }
68 }
69}
70
71impl fmt::Display for MetaAddressStrategy {
72 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
73 match self {
74 MetaAddressStrategy::LoadBalance(addr) => {
75 write!(f, "{}{}", META_ADDRESS_LOAD_BALANCE_MODE_PREFIX, addr)?;
76 }
77 MetaAddressStrategy::List(addrs) => {
78 write!(f, "{}", addrs.iter().format(","))?;
79 }
80 }
81 Ok(())
82 }
83}
84
85impl MetaAddressStrategy {
86 pub fn exactly_one(&self) -> Option<&http::Uri> {
88 match self {
89 MetaAddressStrategy::LoadBalance(lb) => Some(lb),
90 MetaAddressStrategy::List(list) => {
91 if list.len() == 1 {
92 list.first()
93 } else {
94 None
95 }
96 }
97 }
98 }
99}
100
101#[cfg(test)]
102mod tests {
103 use super::*;
104
105 #[test]
106 fn test_parse_meta_addr() {
107 let results = vec![
108 (
109 "load-balance+http://abc",
110 Some(MetaAddressStrategy::LoadBalance(
111 "http://abc".parse().unwrap(),
112 )),
113 ),
114 ("load-balance+http://abc,http://def", None),
115 ("", None),
116 (
117 "http://abc",
118 Some(MetaAddressStrategy::List(vec![
119 "http://abc".parse().unwrap(),
120 ])),
121 ),
122 (
123 "http://abc,http://def",
124 Some(MetaAddressStrategy::List(vec![
125 "http://abc".parse().unwrap(),
126 "http://def".parse().unwrap(),
127 ])),
128 ),
129 ];
130 for (addr, result) in results {
131 let parsed_result = addr.parse();
132 match result {
133 None => {
134 assert!(parsed_result.is_err(), "{parsed_result:?}");
135 }
136 Some(strategy) => {
137 assert_eq!(strategy, parsed_result.unwrap());
138 }
139 }
140 }
141 }
142}