risingwave_common/util/
meta_addr.rs1use std::fmt::{self, Formatter};
16use std::str::FromStr;
17
18use itertools::Itertools;
19
20const META_ADDRESS_LOAD_BALANCE_MODE_PREFIX: &str = "load-balance+";
21
22#[derive(Debug, Eq, PartialEq, Clone)]
26pub enum MetaAddressStrategy {
27 LoadBalance(http::Uri),
28 List(Vec<http::Uri>),
29}
30
31#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
33pub enum MetaAddressStrategyParseError {
34 #[error("empty meta addresses")]
35 Empty,
36 #[error("there should be only one load balance address")]
37 MultipleLoadBalance,
38 #[error("failed to parse meta address `{1}`: {0}")]
39 UrlParse(#[source] http::uri::InvalidUri, String),
40}
41
42impl FromStr for MetaAddressStrategy {
43 type Err = MetaAddressStrategyParseError;
44
45 fn from_str(meta_addr: &str) -> Result<Self, Self::Err> {
46 if let Some(addr) = meta_addr.strip_prefix(META_ADDRESS_LOAD_BALANCE_MODE_PREFIX) {
47 let addr = Itertools::exactly_one(addr.split(','))
51 .map_err(|_| MetaAddressStrategyParseError::MultipleLoadBalance)?;
52
53 let uri = addr.parse().into_url_parse(addr)?;
54
55 Ok(Self::LoadBalance(uri))
56 } else {
57 let addrs = meta_addr.split(',').peekable();
58
59 let uris: Vec<_> = addrs
60 .map(|addr| addr.parse().into_url_parse(addr))
61 .try_collect()?;
62
63 if uris.is_empty() {
64 return Err(MetaAddressStrategyParseError::Empty);
65 }
66
67 Ok(Self::List(uris))
68 }
69 }
70}
71
72impl fmt::Display for MetaAddressStrategy {
73 fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
74 match self {
75 MetaAddressStrategy::LoadBalance(addr) => {
76 write!(f, "{}{}", META_ADDRESS_LOAD_BALANCE_MODE_PREFIX, addr)?;
77 }
78 MetaAddressStrategy::List(addrs) => {
79 write!(f, "{}", addrs.iter().format(","))?;
80 }
81 }
82 Ok(())
83 }
84}
85
86impl MetaAddressStrategy {
87 pub fn exactly_one(&self) -> Option<&http::Uri> {
89 match self {
90 MetaAddressStrategy::LoadBalance(lb) => Some(lb),
91 MetaAddressStrategy::List(list) => {
92 if list.len() == 1 {
93 list.first()
94 } else {
95 None
96 }
97 }
98 }
99 }
100}
101
102#[cfg(test)]
103mod tests {
104 use super::*;
105
106 #[test]
107 fn test_parse_meta_addr() {
108 let results = vec![
109 (
110 "load-balance+http://abc",
111 Some(MetaAddressStrategy::LoadBalance(
112 "http://abc".parse().unwrap(),
113 )),
114 ),
115 ("load-balance+http://abc,http://def", None),
116 ("", None),
117 (
118 "http://abc",
119 Some(MetaAddressStrategy::List(vec![
120 "http://abc".parse().unwrap(),
121 ])),
122 ),
123 (
124 "http://abc,http://def",
125 Some(MetaAddressStrategy::List(vec![
126 "http://abc".parse().unwrap(),
127 "http://def".parse().unwrap(),
128 ])),
129 ),
130 ];
131 for (addr, result) in results {
132 let parsed_result = addr.parse();
133 match result {
134 None => {
135 assert!(parsed_result.is_err(), "{parsed_result:?}");
136 }
137 Some(strategy) => {
138 assert_eq!(strategy, parsed_result.unwrap());
139 }
140 }
141 }
142 }
143}