risingwave_connector/source/kafka/
private_link.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::str::FromStr;
17
18use anyhow::{Context, anyhow};
19use itertools::Itertools;
20use rdkafka::client::BrokerAddr;
21use risingwave_common::bail;
22use risingwave_common::util::addr::HostAddr;
23use risingwave_common::util::iter_util::ZipEqFast;
24use risingwave_pb::catalog::connection::PrivateLinkService;
25use serde_derive::Deserialize;
26
27use crate::connector_common::{
28    AwsPrivateLinkItem, PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
29};
30use crate::error::ConnectorResult;
31use crate::source::kafka::{KAFKA_PROPS_BROKER_KEY, KAFKA_PROPS_BROKER_KEY_ALIAS};
32
33pub const PRIVATELINK_ENDPOINT_KEY: &str = "privatelink.endpoint";
34
35#[derive(Debug)]
36pub(super) enum PrivateLinkContextRole {
37    Consumer,
38    #[expect(dead_code)]
39    Producer,
40}
41
42#[derive(Debug, Deserialize)]
43struct PrivateLinkEndpointItem {
44    host: String,
45}
46
47impl std::fmt::Display for PrivateLinkContextRole {
48    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
49        match self {
50            PrivateLinkContextRole::Consumer => write!(f, "consumer"),
51            PrivateLinkContextRole::Producer => write!(f, "producer"),
52        }
53    }
54}
55
56pub(super) struct BrokerAddrRewriter {
57    #[expect(dead_code)]
58    role: PrivateLinkContextRole,
59    rewrite_map: BTreeMap<BrokerAddr, BrokerAddr>,
60}
61
62impl BrokerAddrRewriter {
63    pub(super) fn rewrite_broker_addr(&self, addr: BrokerAddr) -> BrokerAddr {
64        match self.rewrite_map.get(&addr) {
65            None => addr,
66            Some(new_addr) => new_addr.clone(),
67        }
68    }
69
70    pub fn new(
71        role: PrivateLinkContextRole,
72        broker_rewrite_map: Option<BTreeMap<String, String>>,
73    ) -> ConnectorResult<Self> {
74        let rewrite_map: ConnectorResult<BTreeMap<BrokerAddr, BrokerAddr>> = broker_rewrite_map
75            .map_or(Ok(BTreeMap::new()), |addr_map| {
76                tracing::info!("[{}] rewrite map {:?}", role, addr_map);
77                addr_map
78                    .into_iter()
79                    .map(|(old_addr, new_addr)| {
80                        let old_addr = HostAddr::from_str(&old_addr)?;
81                        let new_addr = HostAddr::from_str(&new_addr)?;
82                        let old_addr = BrokerAddr {
83                            host: old_addr.host,
84                            port: old_addr.port.to_string(),
85                        };
86                        let new_addr = BrokerAddr {
87                            host: new_addr.host,
88                            port: new_addr.port.to_string(),
89                        };
90                        Ok((old_addr, new_addr))
91                    })
92                    .collect()
93            });
94        let rewrite_map = rewrite_map?;
95        Ok(Self { role, rewrite_map })
96    }
97}
98
99#[inline(always)]
100fn kafka_props_broker_key(with_properties: &BTreeMap<String, String>) -> &str {
101    if with_properties.contains_key(KAFKA_PROPS_BROKER_KEY) {
102        KAFKA_PROPS_BROKER_KEY
103    } else {
104        KAFKA_PROPS_BROKER_KEY_ALIAS
105    }
106}
107
108#[inline(always)]
109fn get_property_required(
110    with_properties: &BTreeMap<String, String>,
111    property: &str,
112) -> ConnectorResult<String> {
113    with_properties
114        .get(property)
115        .map(|s| s.to_lowercase())
116        .with_context(|| format!("Required property \"{property}\" is not provided"))
117        .map_err(Into::into)
118}
119
120pub fn insert_privatelink_broker_rewrite_map(
121    with_options: &mut BTreeMap<String, String>,
122    svc: Option<&PrivateLinkService>,
123    privatelink_endpoint: Option<String>,
124) -> ConnectorResult<()> {
125    let mut broker_rewrite_map = HashMap::new();
126    let servers = get_property_required(with_options, kafka_props_broker_key(with_options))?;
127    let broker_addrs = servers.split(',').collect_vec();
128    let link_target_value = get_property_required(with_options, PRIVATE_LINK_TARGETS_KEY)?;
129    let link_targets: Vec<AwsPrivateLinkItem> =
130        serde_json::from_str(link_target_value.as_str()).map_err(|e| anyhow!(e))?;
131    // remove the private link targets from WITH options, as they are useless after we constructed the rewrite mapping
132    with_options.remove(PRIVATE_LINK_TARGETS_KEY);
133
134    if broker_addrs.len() != link_targets.len() {
135        bail!(
136            "The number of broker addrs {} does not match the number of private link targets {}",
137            broker_addrs.len(),
138            link_targets.len()
139        );
140    }
141
142    if let Some(endpoint) = privatelink_endpoint {
143        // new syntax: endpoint can either be a string or a json array of strings
144        // if it is a string, rewrite all broker addresses to the same endpoint
145        // eg. privatelink.endpoint='some_url' ==> broker1:9092 -> some_url:9092, broker2:9093 -> some_url:9093
146        // if it is a json array, rewrite each broker address to the corresponding endpoint
147        // eg. privatelink.endpoint = '[{"host": "aaaa"}, {"host": "bbbb"}, {"host": "cccc"}]'
148        // ==> broker1:9092 -> aaaa:9092, broker2:9093 -> bbbb:9093, broker3:9094 -> cccc:9094
149        handle_privatelink_endpoint(
150            &endpoint,
151            &mut broker_rewrite_map,
152            &link_targets,
153            &broker_addrs,
154        )?;
155    } else {
156        if svc.is_none() {
157            bail!("Privatelink endpoint not found.");
158        }
159        let svc = svc.unwrap();
160        for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.into_iter()) {
161            if svc.dns_entries.is_empty() {
162                bail!(
163                    "No available private link endpoints for Kafka broker {}",
164                    broker
165                );
166            }
167            // rewrite the broker address to the dns name w/o az
168            // requires the NLB has enabled the cross-zone load balancing
169            broker_rewrite_map.insert(
170                broker.to_owned(),
171                format!("{}:{}", &svc.endpoint_dns_name, link.port),
172            );
173        }
174    }
175
176    // save private link dns names into source properties, which
177    // will be extracted into KafkaProperties
178    let json = serde_json::to_string(&broker_rewrite_map).map_err(|e| anyhow!(e))?;
179    with_options.insert(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY.to_owned(), json);
180    Ok(())
181}
182
183fn handle_privatelink_endpoint(
184    endpoint: &str,
185    broker_rewrite_map: &mut HashMap<String, String>,
186    link_targets: &[AwsPrivateLinkItem],
187    broker_addrs: &[&str],
188) -> ConnectorResult<()> {
189    let endpoint = if let Ok(json) = serde_json::from_str::<serde_json::Value>(endpoint) {
190        json
191    } else {
192        serde_json::Value::String(endpoint.to_owned())
193    };
194    if matches!(endpoint, serde_json::Value::String(_)) {
195        let endpoint = endpoint.as_str().unwrap();
196        for (link, broker) in link_targets.iter().zip_eq_fast(broker_addrs.iter()) {
197            // rewrite the broker address to endpoint:port
198            broker_rewrite_map.insert(broker.to_string(), format!("{}:{}", endpoint, link.port));
199        }
200    } else if matches!(endpoint, serde_json::Value::Array(_)) {
201        let endpoint_list: Vec<PrivateLinkEndpointItem> = endpoint
202            .as_array()
203            .unwrap()
204            .iter()
205            .map(|v| {
206                serde_json::from_value(v.clone()).map_err(|_| {
207                    anyhow!(
208                        "expect json schema {{\"host\": \"endpoint url\"}} but got {}",
209                        v
210                    )
211                })
212            })
213            .collect::<Result<Vec<_>, _>>()?;
214        for ((link, broker), endpoint) in link_targets
215            .iter()
216            .zip_eq_fast(broker_addrs.iter())
217            .zip_eq_fast(endpoint_list.iter())
218        {
219            // rewrite the broker address to endpoint:port
220            broker_rewrite_map.insert(
221                broker.to_string(),
222                format!("{}:{}", endpoint.host, link.port),
223            );
224        }
225    } else {
226        bail!(
227            "expect a string or a json array for privatelink.endpoint, but got {:?}",
228            endpoint
229        )
230    }
231
232    Ok(())
233}
234
235#[cfg(test)]
236mod tests {
237    use super::*;
238
239    #[test]
240    fn test_handle_privatelink_endpoint() {
241        let endpoint = "some_url"; // raw string
242        let link_targets = vec![
243            AwsPrivateLinkItem {
244                az_id: None,
245                port: 9092,
246            },
247            AwsPrivateLinkItem {
248                az_id: None,
249                port: 9093,
250            },
251        ];
252        let broker_addrs = vec!["broker1:9092", "broker2:9093"];
253        let mut broker_rewrite_map = HashMap::new();
254        handle_privatelink_endpoint(
255            endpoint,
256            &mut broker_rewrite_map,
257            &link_targets,
258            &broker_addrs,
259        )
260        .unwrap();
261
262        assert_eq!(broker_rewrite_map.len(), 2);
263        assert_eq!(broker_rewrite_map["broker1:9092"], "some_url:9092");
264        assert_eq!(broker_rewrite_map["broker2:9093"], "some_url:9093");
265
266        // example 2: json array
267        let endpoint = r#"[{"host": "aaaa"}, {"host": "bbbb"}, {"host": "cccc"}]"#;
268        let broker_addrs = vec!["broker1:9092", "broker2:9093", "broker3:9094"];
269        let link_targets = vec![
270            AwsPrivateLinkItem {
271                az_id: None,
272                port: 9092,
273            },
274            AwsPrivateLinkItem {
275                az_id: None,
276                port: 9093,
277            },
278            AwsPrivateLinkItem {
279                az_id: None,
280                port: 9094,
281            },
282        ];
283        let mut broker_rewrite_map = HashMap::new();
284        handle_privatelink_endpoint(
285            endpoint,
286            &mut broker_rewrite_map,
287            &link_targets,
288            &broker_addrs,
289        )
290        .unwrap();
291
292        assert_eq!(broker_rewrite_map.len(), 3);
293        assert_eq!(broker_rewrite_map["broker1:9092"], "aaaa:9092");
294        assert_eq!(broker_rewrite_map["broker2:9093"], "bbbb:9093");
295        assert_eq!(broker_rewrite_map["broker3:9094"], "cccc:9094");
296
297        // no `host` in the json array
298        let endpoint = r#"[{"somekey_1": "aaaa"}, {"somekey_2": "bbbb"}, {"somekey_3": "cccc"}]"#;
299        let mut broker_rewrite_map = HashMap::new();
300        let err = handle_privatelink_endpoint(
301            endpoint,
302            &mut broker_rewrite_map,
303            &link_targets,
304            &broker_addrs,
305        )
306        .unwrap_err();
307        assert_eq!(
308            err.to_string(),
309            "expect json schema {\"host\": \"endpoint url\"} but got {\"somekey_1\":\"aaaa\"}"
310        );
311
312        // illegal json
313        let endpoint = r#"{}"#;
314        let mut broker_rewrite_map = HashMap::new();
315        let err = handle_privatelink_endpoint(
316            endpoint,
317            &mut broker_rewrite_map,
318            &link_targets,
319            &broker_addrs,
320        )
321        .unwrap_err();
322        assert_eq!(
323            err.to_string(),
324            "expect a string or a json array for privatelink.endpoint, but got Object {}"
325        );
326    }
327}