risingwave_connector/source/kafka/
private_link.rs1use std::collections::{BTreeMap, HashMap};
16use std::str::FromStr;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use rdkafka::client::BrokerAddr;
21use risingwave_common::bail;
22use risingwave_common::util::addr::HostAddr;
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_pb::catalog::connection::PrivateLinkService;
25use serde_derive::Deserialize;
26
27use crate::connector_common::{
28 AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
29};
30use crate::error::ConnectorResult;
31use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
32
33pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
34
35#[derive(Debug)]
36pub(super) enum PrivateLinkContextRole {
37 Consumer,
38 #[expect(dead_code)]
39 Producer,
40}
41
42#[derive(Debug, Deserialize)]
43struct PrivateLinkEndpointItem {
44 host: String,
45}
46
47impl std::fmt::Display for PrivateLinkContextRole {
48 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49 match self {
50 PrivateLinkContextRole::Consumer => write!(f, "consumer"),
51 PrivateLinkContextRole::Producer => write!(f, "producer"),
52 }
53 }
54}
55
56pub(super) struct BrokerAddrRewriter {
57 #[expect(dead_code)]
58 role: PrivateLinkContextRole,
59 rewrite_map: BTreeMap<BrokerAddr, BrokerAddr>,
60}
61
62impl BrokerAddrRewriter {
63 pub(super) fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
64 match self.rewrite_map.get(&addr) {
65 None => addr,
66 Some(new_addr) => new_addr.clone(),
67 }
68 }
69
70 pub fn new(
71 role: PrivateLinkContextRole,
72 broker_rewrite_map: Option<BTreeMap<String, String>>,
73 ) -> ConnectorResult<Self> {
74 let rewrite_map: ConnectorResult<BTreeMap<BrokerAddr, BrokerAddr>> = broker_rewrite_map
75 .map_or(Ok(BTreeMap::new()), |addr_map| {
76 tracing::info!("[{}] rewrite map {:?}", role, addr_map);
77 addr_map
78 .into_iter()
79 .map(|(old_addr, new_addr)| {
80 let old_addr = HostAddr::from_str(&old_addr)?;
81 let new_addr = HostAddr::from_str(&new_addr)?;
82 let old_addr = BrokerAddr {
83 host: old_addr.host,
84 port: old_addr.port.to_string(),
85 };
86 let new_addr = BrokerAddr {
87 host: new_addr.host,
88 port: new_addr.port.to_string(),
89 };
90 Ok((old_addr, new_addr))
91 })
92 .collect()
93 });
94 let rewrite_map = rewrite_map?;
95 Ok(Self { role, rewrite_map })
96 }
97}
98
99#[inline(always)]
100fn kafka_props_broker_key(with_properties: &BTreeMap<String, String>) -> &str {
101 if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) {
102 KAFKA_PROPS_BROKER_KEY
103 } else {
104 KAFKA_PROPS_BROKER_KEY_ALIAS
105 }
106}
107
108#[inline(always)]
109fn get_property_required(
110 with_properties: &BTreeMap<String, String>,
111 property: &str,
112) -> ConnectorResult<String> {
113 with_properties
114 .get(property)
115 .map(|s| s.to_lowercase())
116 .with_context(|| format!("Required property \"{property}\" is not provided"))
117 .map_err(Into::into)
118}
119
120pub fn insert_privatelink_broker_rewrite_map(
121 with_options: &mut BTreeMap<String, String>,
122 svc: Option<&PrivateLinkService>,
123 privatelink_endpoint: Option<String>,
124) -> ConnectorResult<()> {
125 let mut broker_rewrite_map = HashMap::new();
126 let servers = get_property_required(with_options, kafka_props_broker_key(with_options))?;
127 let broker_addrs = servers.split(',').collect_vec();
128 let link_target_value = get_property_required(with_options, PRIVATE_LINK_TARGETS_KEY)?;
129 let link_targets: Vec<AwsPrivateLinkItem> =
130 serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?;
131 with_options.remove(PRIVATE_LINK_TARGETS_KEY);
133
134 if broker_addrs.len() != link_targets.len() {
135 bail!(
136 "The number of broker addrs {} does not match the number of private link targets {}",
137 broker_addrs.len(),
138 link_targets.len()
139 );
140 }
141
142 if let Some(endpoint) = privatelink_endpoint {
143 handle_privatelink_endpoint(
150 &endpoint,
151 &mut broker_rewrite_map,
152 &link_targets,
153 &broker_addrs,
154 )?;
155 } else {
156 if svc.is_none() {
157 bail!("Privatelink endpoint not found.");
158 }
159 let svc = svc.unwrap();
160 for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
161 if svc.dns_entries.is_empty() {
162 bail!(
163 "No available private link endpoints for Kafka broker {}",
164 broker
165 );
166 }
167 broker_rewrite_map.insert(
170 broker.to_owned(),
171 format!("{}:{}", &svc.endpoint_dns_name, link.port),
172 );
173 }
174 }
175
176 let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?;
179 with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_owned(), json);
180 Ok(())
181}
182
183fn handle_privatelink_endpoint(
184 endpoint: &str,
185 broker_rewrite_map: &mut HashMap<String, String>,
186 link_targets: &[AwsPrivateLinkItem],
187 broker_addrs: &[&str],
188) -> ConnectorResult<()> {
189 let endpoint = if let Ok(json) = serde_json::from_str::<serde_json::Value>(endpoint) {
190 json
191 } else {
192 serde_json::Value::String(endpoint.to_owned())
193 };
194 if matches!(endpoint, serde_json::Value::String(_)) {
195 let endpoint = endpoint.as_str().unwrap();
196 for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.iter()) {
197 broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", endpoint, link.port));
199 }
200 } else if matches!(endpoint, serde_json::Value::Array(_)) {
201 let endpoint_list: Vec<PrivateLinkEndpointItem> = endpoint
202 .as_array()
203 .unwrap()
204 .iter()
205 .map(|v| {
206 serde_json::from_value(v.clone()).map_err(|_| {
207 anyhow!(
208 "expect json schema {{\"host\": \"endpoint url\"}} but got {}",
209 v
210 )
211 })
212 })
213 .collect::<Result<Vec<_>, _>>()?;
214 for ((link, broker), endpoint) in link_targets
215 .iter()
216 .zip_eq_fast(broker_addrs.iter())
217 .zip_eq_fast(endpoint_list.iter())
218 {
219 broker_rewrite_map.insert(
221 broker.to_string(),
222 format!("{}:{}", endpoint.host, link.port),
223 );
224 }
225 } else {
226 bail!(
227 "expect a string or a json array for privatelink.endpoint, but got {:?}",
228 endpoint
229 )
230 }
231
232 Ok(())
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn test_handle_privatelink_endpoint() {
241 let endpoint = "some_url"; let link_targets = vec![
243 AwsPrivateLinkItem {
244 az_id: None,
245 port: 9092,
246 },
247 AwsPrivateLinkItem {
248 az_id: None,
249 port: 9093,
250 },
251 ];
252 let broker_addrs = vec!["broker1:9092", "broker2:9093"];
253 let mut broker_rewrite_map = HashMap::new();
254 handle_privatelink_endpoint(
255 endpoint,
256 &mut broker_rewrite_map,
257 &link_targets,
258 &broker_addrs,
259 )
260 .unwrap();
261
262 assert_eq!(broker_rewrite_map.len(), 2);
263 assert_eq!(broker_rewrite_map["broker1:9092"], "some_url:9092");
264 assert_eq!(broker_rewrite_map["broker2:9093"], "some_url:9093");
265
266 let endpoint = r#"[{"host": "aaaa"}, {"host": "bbbb"}, {"host": "cccc"}]"#;
268 let broker_addrs = vec!["broker1:9092", "broker2:9093", "broker3:9094"];
269 let link_targets = vec![
270 AwsPrivateLinkItem {
271 az_id: None,
272 port: 9092,
273 },
274 AwsPrivateLinkItem {
275 az_id: None,
276 port: 9093,
277 },
278 AwsPrivateLinkItem {
279 az_id: None,
280 port: 9094,
281 },
282 ];
283 let mut broker_rewrite_map = HashMap::new();
284 handle_privatelink_endpoint(
285 endpoint,
286 &mut broker_rewrite_map,
287 &link_targets,
288 &broker_addrs,
289 )
290 .unwrap();
291
292 assert_eq!(broker_rewrite_map.len(), 3);
293 assert_eq!(broker_rewrite_map["broker1:9092"], "aaaa:9092");
294 assert_eq!(broker_rewrite_map["broker2:9093"], "bbbb:9093");
295 assert_eq!(broker_rewrite_map["broker3:9094"], "cccc:9094");
296
297 let endpoint = r#"[{"somekey_1": "aaaa"}, {"somekey_2": "bbbb"}, {"somekey_3": "cccc"}]"#;
299 let mut broker_rewrite_map = HashMap::new();
300 let err = handle_privatelink_endpoint(
301 endpoint,
302 &mut broker_rewrite_map,
303 &link_targets,
304 &broker_addrs,
305 )
306 .unwrap_err();
307 assert_eq!(
308 err.to_string(),
309 "expect json schema {\"host\": \"endpoint url\"} but got {\"somekey_1\":\"aaaa\"}"
310 );
311
312 let endpoint = r#"{}"#;
314 let mut broker_rewrite_map = HashMap::new();
315 let err = handle_privatelink_endpoint(
316 endpoint,
317 &mut broker_rewrite_map,
318 &link_targets,
319 &broker_addrs,
320 )
321 .unwrap_err();
322 assert_eq!(
323 err.to_string(),
324 "expect a string or a json array for privatelink.endpoint, but got Object {}"
325 );
326 }
327}