1use std::collections::BTreeMap;
16use std::hash::Hash;
17use std::io::Write;
18use std::path::Path;
19use std::sync::{Arc, LazyLock, Weak};
20use std::time::Duration;
21
22use anyhow::{Context, anyhow};
23use async_nats::jetstream::consumer::DeliverPolicy;
24use async_nats::jetstream::{self};
25use aws_sdk_kinesis::Client as KinesisClient;
26use aws_sdk_kinesis::config::{AsyncSleep, SharedAsyncSleep, Sleep};
27use moka::future::Cache as MokaCache;
28use moka::ops::compute::Op;
29use phf::{Set, phf_set};
30use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
31use pulsar::{Authentication, Pulsar, TokioExecutor};
32use rdkafka::ClientConfig;
33use risingwave_common::bail;
34use rustls_pki_types::pem::PemObject;
35use rustls_pki_types::{CertificateDer, PrivatePkcs8KeyDer};
36use serde::Deserialize;
37use serde_with::json::JsonString;
38use serde_with::{DisplayFromStr, serde_as};
39use tempfile::NamedTempFile;
40use time::OffsetDateTime;
41use url::Url;
42use with_options::WithOptions;
43
44use crate::aws_utils::load_file_descriptor_from_s3;
45use crate::deserialize_duration_from_string;
46use crate::enforce_secret::EnforceSecret;
47use crate::error::ConnectorResult;
48use crate::sink::SinkError;
49use crate::source::nats::source::NatsOffset;
50
51pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
52pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";
53
54const AWS_MSK_IAM_AUTH: &str = "AWS_MSK_IAM";
55
56pub const DISABLE_DEFAULT_CREDENTIAL: &str = "DISABLE_DEFAULT_CREDENTIAL";
59
60#[derive(Debug, Clone, Deserialize)]
61pub struct AwsPrivateLinkItem {
62 pub az_id: Option<String>,
63 pub port: u16,
64}
65
66use aws_config::default_provider::region::DefaultRegionChain;
67use aws_config::sts::AssumeRoleProvider;
68use aws_credential_types::provider::SharedCredentialsProvider;
69use aws_types::SdkConfig;
70use aws_types::region::Region;
71use risingwave_common::util::env_var::env_var_is_true;
72
73#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
75pub struct AwsAuthProps {
76 #[serde(rename = "aws.region", alias = "region", alias = "s3.region")]
77 pub region: Option<String>,
78
79 #[serde(
80 rename = "aws.endpoint_url",
81 alias = "endpoint_url",
82 alias = "endpoint",
83 alias = "s3.endpoint"
84 )]
85 pub endpoint: Option<String>,
86 #[serde(
87 rename = "aws.credentials.access_key_id",
88 alias = "access_key",
89 alias = "s3.access.key"
90 )]
91 pub access_key: Option<String>,
92 #[serde(
93 rename = "aws.credentials.secret_access_key",
94 alias = "secret_key",
95 alias = "s3.secret.key"
96 )]
97 pub secret_key: Option<String>,
98 #[serde(rename = "aws.credentials.session_token", alias = "session_token")]
99 pub session_token: Option<String>,
100 #[serde(rename = "aws.credentials.role.arn", alias = "arn")]
102 pub arn: Option<String>,
103 #[serde(rename = "aws.credentials.role.external_id", alias = "external_id")]
105 pub external_id: Option<String>,
106 #[serde(rename = "aws.profile", alias = "profile")]
107 pub profile: Option<String>,
108 #[serde(rename = "aws.msk.signer_timeout_sec")]
109 pub msk_signer_timeout_sec: Option<u64>,
110}
111
112impl EnforceSecret for AwsAuthProps {
113 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
114 "access_key",
115 "aws.credentials.access_key_id",
116 "s3.access.key",
117 "secret_key",
118 "aws.credentials.secret_access_key",
119 "s3.secret.key",
120 "session_token",
121 "aws.credentials.session_token",
122 };
123}
124
125impl AwsAuthProps {
126 async fn build_region(&self) -> ConnectorResult<Region> {
127 if let Some(region_name) = &self.region {
128 Ok(Region::new(region_name.clone()))
129 } else {
130 let mut region_chain = DefaultRegionChain::builder();
131 if let Some(profile_name) = &self.profile {
132 region_chain = region_chain.profile_name(profile_name);
133 }
134
135 Ok(region_chain
136 .build()
137 .region()
138 .await
139 .context("region should be provided")?)
140 }
141 }
142
143 async fn build_credential_provider(&self) -> ConnectorResult<SharedCredentialsProvider> {
144 if self.access_key.is_some() && self.secret_key.is_some() {
145 Ok(SharedCredentialsProvider::new(
146 aws_credential_types::Credentials::from_keys(
147 self.access_key.as_ref().unwrap(),
148 self.secret_key.as_ref().unwrap(),
149 self.session_token.clone(),
150 ),
151 ))
152 } else if !env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
153 Ok(SharedCredentialsProvider::new(
154 aws_config::default_provider::credentials::default_provider().await,
155 ))
156 } else {
157 bail!("Both \"access_key\" and \"secret_key\" are required.")
158 }
159 }
160
161 async fn with_role_provider(
162 &self,
163 credential: SharedCredentialsProvider,
164 ) -> ConnectorResult<SharedCredentialsProvider> {
165 if let Some(role_name) = &self.arn {
166 let region = self.build_region().await?;
167 let mut role = AssumeRoleProvider::builder(role_name)
168 .session_name("RisingWave")
169 .region(region);
170 if let Some(id) = &self.external_id {
171 role = role.external_id(id);
172 }
173 let provider = role.build_from_provider(credential).await;
174 Ok(SharedCredentialsProvider::new(provider))
175 } else {
176 Ok(credential)
177 }
178 }
179
180 pub async fn build_config(&self) -> ConnectorResult<SdkConfig> {
181 let region = self.build_region().await?;
182 let credentials_provider = self
183 .with_role_provider(self.build_credential_provider().await?)
184 .await?;
185 let mut config_loader = aws_config::from_env()
186 .region(region)
187 .credentials_provider(credentials_provider);
188
189 if let Some(endpoint) = self.endpoint.as_ref() {
190 config_loader = config_loader.endpoint_url(endpoint);
191 }
192
193 Ok(config_loader.load().await)
194 }
195}
196
197#[serde_as]
198#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
199pub struct KafkaConnectionProps {
200 #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
201 pub brokers: String,
202
203 #[serde(rename = "properties.security.protocol")]
206 #[with_option(allow_alter_on_fly)]
207 security_protocol: Option<String>,
208
209 #[serde(rename = "properties.ssl.endpoint.identification.algorithm")]
210 #[with_option(allow_alter_on_fly)]
211 ssl_endpoint_identification_algorithm: Option<String>,
212
213 #[serde(rename = "properties.ssl.ca.location")]
216 ssl_ca_location: Option<String>,
217
218 #[serde(rename = "properties.ssl.ca.pem")]
220 ssl_ca_pem: Option<String>,
221
222 #[serde(rename = "properties.ssl.certificate.location")]
224 ssl_certificate_location: Option<String>,
225
226 #[serde(rename = "properties.ssl.certificate.pem")]
228 ssl_certificate_pem: Option<String>,
229
230 #[serde(rename = "properties.ssl.key.location")]
232 ssl_key_location: Option<String>,
233
234 #[serde(rename = "properties.ssl.key.pem")]
236 ssl_key_pem: Option<String>,
237
238 #[serde(rename = "properties.ssl.key.password")]
240 ssl_key_password: Option<String>,
241
242 #[serde(rename = "properties.sasl.mechanism")]
244 #[with_option(allow_alter_on_fly)]
245 sasl_mechanism: Option<String>,
246
247 #[serde(rename = "properties.sasl.username")]
249 #[with_option(allow_alter_on_fly)]
250 sasl_username: Option<String>,
251
252 #[serde(rename = "properties.sasl.password")]
254 #[with_option(allow_alter_on_fly)]
255 sasl_password: Option<String>,
256
257 #[serde(rename = "properties.sasl.kerberos.service.name")]
259 sasl_kerberos_service_name: Option<String>,
260
261 #[serde(rename = "properties.sasl.kerberos.keytab")]
263 sasl_kerberos_keytab: Option<String>,
264
265 #[serde(rename = "properties.sasl.kerberos.principal")]
267 sasl_kerberos_principal: Option<String>,
268
269 #[serde(rename = "properties.sasl.kerberos.kinit.cmd")]
271 sasl_kerberos_kinit_cmd: Option<String>,
272
273 #[serde(rename = "properties.sasl.kerberos.min.time.before.relogin")]
275 sasl_kerberos_min_time_before_relogin: Option<String>,
276
277 #[serde(rename = "properties.sasl.oauthbearer.config")]
279 sasl_oathbearer_config: Option<String>,
280}
281
282impl EnforceSecret for KafkaConnectionProps {
283 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
284 "properties.ssl.key.pem",
285 "properties.ssl.key.password",
286 "properties.sasl.password",
287 };
288}
289
290#[serde_as]
291#[derive(Debug, Clone, Deserialize, WithOptions)]
292pub struct KafkaCommon {
293 #[serde(rename = "topic", alias = "kafka.topic")]
295 pub topic: String,
296
297 #[serde(
298 rename = "properties.sync.call.timeout",
299 deserialize_with = "deserialize_duration_from_string",
300 default = "default_kafka_sync_call_timeout"
301 )]
302 #[with_option(allow_alter_on_fly)]
303 pub sync_call_timeout: Duration,
304}
305
306#[serde_as]
307#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
308pub struct KafkaPrivateLinkCommon {
309 #[serde(rename = "broker.rewrite.endpoints")]
311 #[serde_as(as = "Option<JsonString>")]
312 pub broker_rewrite_map: Option<BTreeMap<String, String>>,
313}
314
315const fn default_kafka_sync_call_timeout() -> Duration {
316 Duration::from_secs(5)
317}
318
319const fn default_socket_keepalive_enable() -> bool {
320 true
321}
322
323#[serde_as]
324#[derive(Debug, Clone, Deserialize, WithOptions)]
325pub struct RdKafkaPropertiesCommon {
326 #[serde(rename = "properties.message.max.bytes")]
331 #[serde_as(as = "Option<DisplayFromStr>")]
332 #[with_option(allow_alter_on_fly)]
333 pub message_max_bytes: Option<usize>,
334
335 #[serde(rename = "properties.receive.message.max.bytes")]
340 #[serde_as(as = "Option<DisplayFromStr>")]
341 #[with_option(allow_alter_on_fly)]
342 pub receive_message_max_bytes: Option<usize>,
343
344 #[serde(rename = "properties.statistics.interval.ms")]
345 #[serde_as(as = "Option<DisplayFromStr>")]
346 #[with_option(allow_alter_on_fly)]
347 pub statistics_interval_ms: Option<usize>,
348
349 #[serde(rename = "properties.client.id")]
351 #[serde_as(as = "Option<DisplayFromStr>")]
352 #[with_option(allow_alter_on_fly)]
353 pub client_id: Option<String>,
354
355 #[serde(rename = "properties.enable.ssl.certificate.verification")]
356 #[serde_as(as = "Option<DisplayFromStr>")]
357 #[with_option(allow_alter_on_fly)]
358 pub enable_ssl_certificate_verification: Option<bool>,
359
360 #[serde(
361 rename = "properties.socket.keepalive.enable",
362 default = "default_socket_keepalive_enable"
363 )]
364 #[serde_as(as = "DisplayFromStr")]
365 pub socket_keepalive_enable: bool,
366}
367
368impl RdKafkaPropertiesCommon {
369 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
370 if let Some(v) = self.statistics_interval_ms {
371 c.set("statistics.interval.ms", v.to_string());
372 }
373 if let Some(v) = self.message_max_bytes {
374 c.set("message.max.bytes", v.to_string());
375 }
376 if let Some(v) = self.receive_message_max_bytes {
377 c.set("receive.message.max.bytes", v.to_string());
378 }
379 if let Some(v) = self.client_id.as_ref() {
380 c.set("client.id", v);
381 }
382 if let Some(v) = self.enable_ssl_certificate_verification {
383 c.set("enable.ssl.certificate.verification", v.to_string());
384 }
385 c.set(
386 "socket.keepalive.enable",
387 self.socket_keepalive_enable.to_string(),
388 );
389 }
390}
391
392impl KafkaConnectionProps {
393 #[cfg(test)]
394 pub fn test_default() -> Self {
395 Self {
396 brokers: "localhost:9092".to_owned(),
397 security_protocol: None,
398 ssl_ca_location: None,
399 ssl_certificate_location: None,
400 ssl_key_location: None,
401 ssl_ca_pem: None,
402 ssl_certificate_pem: None,
403 ssl_key_pem: None,
404 ssl_key_password: None,
405 ssl_endpoint_identification_algorithm: None,
406 sasl_mechanism: None,
407 sasl_username: None,
408 sasl_password: None,
409 sasl_kerberos_service_name: None,
410 sasl_kerberos_keytab: None,
411 sasl_kerberos_principal: None,
412 sasl_kerberos_kinit_cmd: None,
413 sasl_kerberos_min_time_before_relogin: None,
414 sasl_oathbearer_config: None,
415 }
416 }
417
418 pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
419 if self.is_aws_msk_iam() {
421 config.set("security.protocol", "SASL_SSL");
422 config.set("sasl.mechanism", "OAUTHBEARER");
423 return;
424 }
425
426 if let Some(security_protocol) = self.security_protocol.as_ref() {
428 config.set("security.protocol", security_protocol);
429 }
430
431 if let Some(ssl_ca_location) = self.ssl_ca_location.as_ref() {
433 config.set("ssl.ca.location", ssl_ca_location);
434 }
435 if let Some(ssl_ca_pem) = self.ssl_ca_pem.as_ref() {
436 config.set("ssl.ca.pem", ssl_ca_pem);
437 }
438 if let Some(ssl_certificate_location) = self.ssl_certificate_location.as_ref() {
439 config.set("ssl.certificate.location", ssl_certificate_location);
440 }
441 if let Some(ssl_certificate_pem) = self.ssl_certificate_pem.as_ref() {
442 config.set("ssl.certificate.pem", ssl_certificate_pem);
443 }
444 if let Some(ssl_key_location) = self.ssl_key_location.as_ref() {
445 config.set("ssl.key.location", ssl_key_location);
446 }
447 if let Some(ssl_key_pem) = self.ssl_key_pem.as_ref() {
448 config.set("ssl.key.pem", ssl_key_pem);
449 }
450 if let Some(ssl_key_password) = self.ssl_key_password.as_ref() {
451 config.set("ssl.key.password", ssl_key_password);
452 }
453 if let Some(ssl_endpoint_identification_algorithm) =
454 self.ssl_endpoint_identification_algorithm.as_ref()
455 {
456 config.set(
458 "ssl.endpoint.identification.algorithm",
459 ssl_endpoint_identification_algorithm,
460 );
461 }
462
463 if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref() {
465 config.set("sasl.mechanism", sasl_mechanism);
466 }
467
468 if let Some(sasl_username) = self.sasl_username.as_ref() {
470 config.set("sasl.username", sasl_username);
471 }
472 if let Some(sasl_password) = self.sasl_password.as_ref() {
473 config.set("sasl.password", sasl_password);
474 }
475
476 if let Some(sasl_kerberos_service_name) = self.sasl_kerberos_service_name.as_ref() {
478 config.set("sasl.kerberos.service.name", sasl_kerberos_service_name);
479 }
480 if let Some(sasl_kerberos_keytab) = self.sasl_kerberos_keytab.as_ref() {
481 config.set("sasl.kerberos.keytab", sasl_kerberos_keytab);
482 }
483 if let Some(sasl_kerberos_principal) = self.sasl_kerberos_principal.as_ref() {
484 config.set("sasl.kerberos.principal", sasl_kerberos_principal);
485 }
486 if let Some(sasl_kerberos_kinit_cmd) = self.sasl_kerberos_kinit_cmd.as_ref() {
487 config.set("sasl.kerberos.kinit.cmd", sasl_kerberos_kinit_cmd);
488 }
489 if let Some(sasl_kerberos_min_time_before_relogin) =
490 self.sasl_kerberos_min_time_before_relogin.as_ref()
491 {
492 config.set(
493 "sasl.kerberos.min.time.before.relogin",
494 sasl_kerberos_min_time_before_relogin,
495 );
496 }
497
498 if let Some(sasl_oathbearer_config) = self.sasl_oathbearer_config.as_ref() {
500 config.set("sasl.oauthbearer.config", sasl_oathbearer_config);
501 }
502 config.set("enable.sasl.oauthbearer.unsecure.jwt", "true");
504 }
505
506 pub(crate) fn is_aws_msk_iam(&self) -> bool {
507 if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref()
508 && sasl_mechanism == AWS_MSK_IAM_AUTH
509 {
510 true
511 } else {
512 false
513 }
514 }
515}
516
517#[derive(Clone, Debug, Deserialize, WithOptions)]
518pub struct PulsarCommon {
519 #[serde(rename = "topic", alias = "pulsar.topic")]
520 pub topic: String,
521
522 #[serde(rename = "service.url", alias = "pulsar.service.url")]
523 pub service_url: String,
524
525 #[serde(rename = "auth.token")]
526 pub auth_token: Option<String>,
527}
528
529impl EnforceSecret for PulsarCommon {
530 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
531 "pulsar.auth.token",
532 };
533}
534
535#[derive(Clone, Debug, Deserialize, WithOptions)]
536pub struct PulsarOauthCommon {
537 #[serde(rename = "oauth.issuer.url")]
538 pub issuer_url: String,
539
540 #[serde(rename = "oauth.credentials.url")]
541 pub credentials_url: String,
542
543 #[serde(rename = "oauth.audience")]
544 pub audience: String,
545
546 #[serde(rename = "oauth.scope")]
547 pub scope: Option<String>,
548}
549
550fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result<NamedTempFile> {
551 let mut f = NamedTempFile::new()?;
552 f.write_all(credentials)?;
553 f.as_file().sync_all()?;
554 Ok(f)
555}
556
557impl PulsarCommon {
558 pub(crate) async fn build_client(
559 &self,
560 oauth: &Option<PulsarOauthCommon>,
561 aws_auth_props: &AwsAuthProps,
562 ) -> ConnectorResult<Pulsar<TokioExecutor>> {
563 let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor);
564 let mut _temp_file = None; if let Some(oauth) = oauth.as_ref() {
567 let (credentials_url, temp_file) = self
568 .resolve_pulsar_credentials_url(oauth, aws_auth_props)
569 .await?;
570 _temp_file = temp_file;
571
572 let auth_params = OAuth2Params {
573 issuer_url: oauth.issuer_url.clone(),
574 credentials_url,
575 audience: Some(oauth.audience.clone()),
576 scope: oauth.scope.clone(),
577 };
578
579 pulsar_builder = pulsar_builder
580 .with_auth_provider(OAuth2Authentication::client_credentials(auth_params));
581 } else if let Some(auth_token) = &self.auth_token {
582 pulsar_builder = pulsar_builder.with_auth(Authentication {
583 name: "token".to_owned(),
584 data: Vec::from(auth_token.as_str()),
585 });
586 }
587
588 let res = pulsar_builder.build().await.map_err(|e| anyhow!(e))?;
589 drop(_temp_file); Ok(res)
591 }
592
593 pub(crate) async fn resolve_pulsar_credentials_url(
594 &self,
595 oauth: &PulsarOauthCommon,
596 aws_auth_props: &AwsAuthProps,
597 ) -> ConnectorResult<(String, Option<NamedTempFile>)> {
598 if let Ok(url) = Url::parse(&oauth.credentials_url) {
600 return self
601 .handle_pulsar_credentials_url(&url, aws_auth_props)
602 .await;
603 }
604
605 let path = Path::new(&oauth.credentials_url);
607 if !path.is_absolute() {
608 bail!("credentials_url must be a valid URL (s3://, file://) or an absolute file path");
609 }
610
611 if !tokio::fs::try_exists(&oauth.credentials_url)
613 .await
614 .unwrap_or(false)
615 {
616 bail!("credentials file does not exist: {}", oauth.credentials_url);
617 }
618
619 Ok((format!("file://{}", oauth.credentials_url), None))
621 }
622
623 pub(crate) async fn handle_pulsar_credentials_url(
624 &self,
625 url: &Url,
626 aws_auth_props: &AwsAuthProps,
627 ) -> ConnectorResult<(String, Option<NamedTempFile>)> {
628 match url.scheme() {
629 "s3" => {
630 let credentials = load_file_descriptor_from_s3(url, aws_auth_props).await?;
631 let temp_file = create_credential_temp_file(&credentials)
632 .context("failed to create temp file for pulsar credentials")?;
633
634 let temp_path = temp_file
635 .path()
636 .to_str()
637 .context("temp file path is not valid UTF-8")?;
638
639 Ok((format!("file://{}", temp_path), Some(temp_file)))
640 }
641 "file" => Ok((url.to_string(), None)),
642 _ => bail!(
643 "invalid credentials_url scheme '{}', only file://, s3://, and absolute file paths are supported",
644 url.scheme()
645 ),
646 }
647 }
648}
649
650#[serde_as]
651#[derive(Deserialize, Debug, Clone, WithOptions)]
652pub struct KinesisCommon {
653 #[serde(rename = "stream", alias = "kinesis.stream.name")]
654 pub stream_name: String,
655 #[serde(rename = "aws.region", alias = "kinesis.stream.region")]
656 pub stream_region: String,
657 #[serde(rename = "endpoint", alias = "kinesis.endpoint")]
658 pub endpoint: Option<String>,
659 #[serde(
660 rename = "aws.credentials.access_key_id",
661 alias = "kinesis.credentials.access"
662 )]
663 pub credentials_access_key: Option<String>,
664 #[serde(
665 rename = "aws.credentials.secret_access_key",
666 alias = "kinesis.credentials.secret"
667 )]
668 pub credentials_secret_access_key: Option<String>,
669 #[serde(
670 rename = "aws.credentials.session_token",
671 alias = "kinesis.credentials.session_token"
672 )]
673 pub session_token: Option<String>,
674 #[serde(rename = "aws.credentials.role.arn", alias = "kinesis.assumerole.arn")]
675 pub assume_role_arn: Option<String>,
676 #[serde(
677 rename = "aws.credentials.role.external_id",
678 alias = "kinesis.assumerole.external_id"
679 )]
680 pub assume_role_external_id: Option<String>,
681
682 #[serde(
684 rename = "kinesis.sdk.connect_timeout_ms",
685 default = "kinesis_default_connect_timeout_ms"
686 )]
687 #[serde_as(as = "DisplayFromStr")]
688 pub sdk_connect_timeout_ms: u64,
689
690 #[serde(
691 rename = "kinesis.sdk.read_timeout_ms",
692 default = "kinesis_default_read_timeout_ms"
693 )]
694 #[serde_as(as = "DisplayFromStr")]
695 pub sdk_read_timeout_ms: u64,
696
697 #[serde(
698 rename = "kinesis.sdk.operation_timeout_ms",
699 default = "kinesis_default_operation_timeout_ms"
700 )]
701 #[serde_as(as = "DisplayFromStr")]
702 pub sdk_operation_timeout_ms: u64,
703
704 #[serde(
705 rename = "kinesis.sdk.operation_attempt_timeout_ms",
706 default = "kinesis_default_operation_attempt_timeout_ms"
707 )]
708 #[serde_as(as = "DisplayFromStr")]
709 pub sdk_operation_attempt_timeout_ms: u64,
710
711 #[serde(
712 rename = "kinesis.sdk.max_retry_limit",
713 default = "kinesis_default_max_retry_limit"
714 )]
715 #[serde_as(as = "DisplayFromStr")]
716 pub sdk_max_retry_limit: u32,
717
718 #[serde(
719 rename = "kinesis.sdk.init_backoff_ms",
720 default = "kinesis_default_init_backoff_ms"
721 )]
722 #[serde_as(as = "DisplayFromStr")]
723 pub sdk_init_backoff_ms: u64,
724
725 #[serde(
726 rename = "kinesis.sdk.max_backoff_ms",
727 default = "kinesis_default_max_backoff_ms"
728 )]
729 #[serde_as(as = "DisplayFromStr")]
730 pub sdk_max_backoff_ms: u64,
731}
732
733#[derive(Debug)]
734pub struct KinesisAsyncSleepImpl;
735
736impl AsyncSleep for KinesisAsyncSleepImpl {
737 fn sleep(&self, duration: Duration) -> Sleep {
738 Sleep::new(async move { tokio::time::sleep(duration).await })
739 }
740}
741
742const fn kinesis_default_connect_timeout_ms() -> u64 {
743 10000
744}
745
746const fn kinesis_default_read_timeout_ms() -> u64 {
747 10000
748}
749
750const fn kinesis_default_operation_timeout_ms() -> u64 {
751 10000
752}
753
754const fn kinesis_default_operation_attempt_timeout_ms() -> u64 {
755 10000
756}
757
758const fn kinesis_default_init_backoff_ms() -> u64 {
759 1000
760}
761
762const fn kinesis_default_max_backoff_ms() -> u64 {
763 20000
764}
765
766const fn kinesis_default_max_retry_limit() -> u32 {
767 3
768}
769
770impl EnforceSecret for KinesisCommon {
771 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
772 "kinesis.credentials.access",
773 "kinesis.credentials.secret",
774 "kinesis.credentials.session_token",
775 };
776}
777
778impl KinesisCommon {
779 pub(crate) async fn build_client(&self) -> ConnectorResult<KinesisClient> {
780 let config = AwsAuthProps {
781 region: Some(self.stream_region.clone()),
782 endpoint: self.endpoint.clone(),
783 access_key: self.credentials_access_key.clone(),
784 secret_key: self.credentials_secret_access_key.clone(),
785 session_token: self.session_token.clone(),
786 arn: self.assume_role_arn.clone(),
787 external_id: self.assume_role_external_id.clone(),
788 profile: Default::default(),
789 msk_signer_timeout_sec: Default::default(),
790 };
791 let aws_config = config.build_config().await?;
792 let mut builder = aws_sdk_kinesis::config::Builder::from(&aws_config);
793 {
794 let sleep_impl = SharedAsyncSleep::new(KinesisAsyncSleepImpl);
796 builder.set_sleep_impl(Some(sleep_impl));
797 let timeout_config = aws_smithy_types::timeout::TimeoutConfig::builder()
798 .connect_timeout(Duration::from_millis(self.sdk_connect_timeout_ms))
799 .read_timeout(Duration::from_millis(self.sdk_read_timeout_ms))
800 .operation_timeout(Duration::from_millis(self.sdk_operation_timeout_ms))
801 .operation_attempt_timeout(Duration::from_millis(
802 self.sdk_operation_attempt_timeout_ms,
803 ))
804 .build();
805 builder.set_timeout_config(Some(timeout_config));
806
807 let retry_config = aws_smithy_types::retry::RetryConfig::standard()
808 .with_initial_backoff(Duration::from_millis(self.sdk_init_backoff_ms))
809 .with_max_backoff(Duration::from_millis(self.sdk_max_backoff_ms))
810 .with_max_attempts(self.sdk_max_retry_limit);
811 builder.set_retry_config(Some(retry_config));
812 }
813 if let Some(endpoint) = &config.endpoint {
814 builder = builder.endpoint_url(endpoint);
815 }
816 Ok(KinesisClient::from_conf(builder.build()))
817 }
818}
819
820#[derive(Debug, Clone, PartialEq, Eq, Hash)]
823pub struct NatsConnectionProps {
824 pub server_url: String,
825 pub connect_mode: String,
826 pub user: Option<String>,
827 pub password: Option<String>,
828 pub jwt: Option<String>,
829 pub nkey: Option<String>,
830}
831
832pub static SHARED_NATS_CLIENT: LazyLock<MokaCache<NatsConnectionProps, Weak<async_nats::Client>>> =
840 LazyLock::new(|| MokaCache::builder().build());
841
842#[serde_as]
843#[derive(Deserialize, Debug, Clone, WithOptions)]
844pub struct NatsCommon {
845 #[serde(rename = "server_url")]
846 pub server_url: String,
847 #[serde(rename = "subject")]
848 pub subject: String,
849 #[serde(rename = "connect_mode")]
850 pub connect_mode: String,
851 #[serde(rename = "username")]
852 pub user: Option<String>,
853 #[serde(rename = "password")]
854 pub password: Option<String>,
855 #[serde(rename = "jwt")]
856 pub jwt: Option<String>,
857 #[serde(rename = "nkey")]
858 pub nkey: Option<String>,
859 #[serde(rename = "max_bytes")]
860 #[serde_as(as = "Option<DisplayFromStr>")]
861 pub max_bytes: Option<i64>,
862 #[serde(rename = "max_messages")]
863 #[serde_as(as = "Option<DisplayFromStr>")]
864 pub max_messages: Option<i64>,
865 #[serde(rename = "max_messages_per_subject")]
866 #[serde_as(as = "Option<DisplayFromStr>")]
867 pub max_messages_per_subject: Option<i64>,
868 #[serde(rename = "max_consumers")]
869 #[serde_as(as = "Option<DisplayFromStr>")]
870 pub max_consumers: Option<i32>,
871 #[serde(rename = "max_message_size")]
872 #[serde_as(as = "Option<DisplayFromStr>")]
873 pub max_message_size: Option<i32>,
874 #[serde(rename = "allow_create_stream", default)]
875 #[serde_as(as = "DisplayFromStr")]
876 pub allow_create_stream: bool,
877}
878
879impl EnforceSecret for NatsCommon {
880 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
881 "password",
882 "jwt",
883 "nkey",
884 };
885}
886
887impl NatsCommon {
888 pub fn connection_props(&self) -> NatsConnectionProps {
890 NatsConnectionProps {
891 server_url: self.server_url.clone(),
892 connect_mode: self.connect_mode.clone(),
893 user: self.user.clone(),
894 password: self.password.clone(),
895 jwt: self.jwt.clone(),
896 nkey: self.nkey.clone(),
897 }
898 }
899
900 async fn build_client_inner(&self) -> ConnectorResult<async_nats::Client> {
902 let mut connect_options = async_nats::ConnectOptions::new();
903 match self.connect_mode.as_str() {
904 "user_and_password" => {
905 if let (Some(v_user), Some(v_password)) =
906 (self.user.as_ref(), self.password.as_ref())
907 {
908 connect_options =
909 connect_options.user_and_password(v_user.into(), v_password.into())
910 } else {
911 bail!("nats connect mode is user_and_password, but user or password is empty");
912 }
913 }
914
915 "credential" => {
916 if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) {
917 connect_options = connect_options
918 .credentials(&self.create_credential(v_nkey, v_jwt)?)
919 .expect("failed to parse static creds")
920 } else {
921 bail!("nats connect mode is credential, but nkey or jwt is empty");
922 }
923 }
924 "plain" => {}
925 _ => {
926 bail!("nats connect mode only accepts user_and_password/credential/plain");
927 }
928 };
929
930 let servers = self.server_url.split(',').collect::<Vec<&str>>();
931 let client = connect_options
932 .connect(
933 servers
934 .iter()
935 .map(|url| url.parse())
936 .collect::<Result<Vec<async_nats::ServerAddr>, _>>()?,
937 )
938 .await
939 .context("build nats client error")
940 .map_err(SinkError::Nats)?;
941 Ok(client)
942 }
943
944 pub(crate) async fn build_client(&self) -> ConnectorResult<Arc<async_nats::Client>> {
947 let connection_props = self.connection_props();
948 let mut client: Option<Arc<async_nats::Client>> = None;
949
950 SHARED_NATS_CLIENT
951 .entry_by_ref(&connection_props)
952 .and_try_compute_with::<_, _, crate::error::ConnectorError>(|maybe_entry| async {
953 if let Some(entry) = maybe_entry
954 && let entry_value = entry.into_value()
955 && let Some(existing_client) = entry_value.upgrade()
956 {
957 match existing_client.connection_state() {
958 async_nats::connection::State::Connected => {
959 tracing::info!("reuse existing nats client for {}", self.server_url);
960 client = Some(existing_client);
961 return Ok(Op::Nop);
962 }
963 _ => {
964 tracing::warn!(
965 server_url = self.server_url,
966 "existing nats client is not connected",
967 );
968 }
969 }
970 }
971 tracing::info!(
972 server_url = self.server_url,
973 "no cached client, or client disconnected, building new nats client"
974 );
975 let new_client = Arc::new(self.build_client_inner().await?);
976 client = Some(new_client.clone());
977 Ok(Op::Put(Arc::downgrade(&new_client)))
978 })
979 .await?;
980
981 Ok(client.expect("client should be set"))
982 }
983
984 pub(crate) async fn build_context(&self) -> ConnectorResult<jetstream::Context> {
985 let client = self.build_client().await?;
986 let jetstream = async_nats::jetstream::new((*client).clone());
987 Ok(jetstream)
988 }
989
990 pub(crate) fn build_context_from_client(
992 client: &Arc<async_nats::Client>,
993 ) -> jetstream::Context {
994 async_nats::jetstream::new((**client).clone())
995 }
996
997 pub(crate) async fn build_consumer(
1002 &self,
1003 stream: String,
1004 durable_consumer_name: String,
1005 split_id: String,
1006 start_sequence: NatsOffset,
1007 mut config: jetstream::consumer::pull::Config,
1008 existing_client: Option<Arc<async_nats::Client>>,
1009 ) -> ConnectorResult<(
1010 async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
1011 Arc<async_nats::Client>,
1012 )> {
1013 let client = match existing_client {
1014 Some(c) => c,
1015 None => self.build_client().await?,
1016 };
1017 let context = Self::build_context_from_client(&client);
1018 let stream = self.build_or_get_stream(context.clone(), stream).await?;
1019 let subject_name = self
1020 .subject
1021 .replace(',', "-")
1022 .replace(['.', '>', '*', ' ', '\t'], "_");
1023 let name = format!("risingwave-consumer-{}-{}", subject_name, split_id);
1024
1025 let deliver_policy = match start_sequence {
1026 NatsOffset::Earliest => DeliverPolicy::All,
1027 NatsOffset::Latest => DeliverPolicy::New,
1028 NatsOffset::SequenceNumber(v) => {
1029 let parsed = v
1031 .parse::<u64>()
1032 .context("failed to parse nats offset as sequence number")?;
1033 DeliverPolicy::ByStartSequence {
1034 start_sequence: 1 + parsed,
1035 }
1036 }
1037 NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
1038 start_time: OffsetDateTime::from_unix_timestamp_nanos(v as i128 * 1_000_000)
1039 .context("invalid timestamp for nats offset")?,
1040 },
1041 NatsOffset::None => DeliverPolicy::All,
1042 };
1043
1044 let consumer = match stream.get_consumer(&name).await {
1045 Ok(consumer) => consumer,
1046 _ => {
1047 stream
1048 .get_or_create_consumer(&name, {
1049 config.deliver_policy = deliver_policy;
1050 config.durable_name = Some(durable_consumer_name);
1051 config.filter_subjects =
1052 self.subject.split(',').map(|s| s.to_owned()).collect();
1053 config
1054 })
1055 .await?
1056 }
1057 };
1058 Ok((consumer, client))
1059 }
1060
1061 pub(crate) async fn build_or_get_stream(
1062 &self,
1063 jetstream: jetstream::Context,
1064 stream_str: String,
1065 ) -> ConnectorResult<jetstream::stream::Stream> {
1066 let subjects: Vec<String> = self.subject.split(',').map(|s| s.to_owned()).collect();
1067
1068 if let Ok(mut stream_instance) = jetstream.get_stream(&stream_str).await {
1071 tracing::info!(
1072 "load existing nats stream ({:?}) with config {:?}",
1073 stream_str,
1074 stream_instance.info().await?
1075 );
1076 return Ok(stream_instance);
1077 }
1078
1079 if !self.allow_create_stream {
1080 return Err(anyhow!(
1081 "stream {} not found, set `allow_create_stream` to true to create a stream",
1082 stream_str
1083 )
1084 .into());
1085 }
1086
1087 let mut config = jetstream::stream::Config {
1088 name: stream_str.clone(),
1089 max_bytes: 1000000,
1090 subjects,
1091 ..Default::default()
1092 };
1093 if let Some(v) = self.max_bytes {
1094 config.max_bytes = v;
1095 }
1096 if let Some(v) = self.max_messages {
1097 config.max_messages = v;
1098 }
1099 if let Some(v) = self.max_messages_per_subject {
1100 config.max_messages_per_subject = v;
1101 }
1102 if let Some(v) = self.max_consumers {
1103 config.max_consumers = v;
1104 }
1105 if let Some(v) = self.max_message_size {
1106 config.max_message_size = v;
1107 }
1108 tracing::info!(
1109 "create nats stream ({:?}) with config {:?}",
1110 &stream_str,
1111 config
1112 );
1113 let stream = jetstream.get_or_create_stream(config).await?;
1114 Ok(stream)
1115 }
1116
1117 pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> ConnectorResult<String> {
1118 let creds = format!(
1119 "-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\
1120 ************************* IMPORTANT *************************\n\
1121 NKEY Seed printed below can be used to sign and prove identity.\n\
1122 NKEYs are sensitive and should be treated as secrets.\n\n\
1123 -----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\
1124 *************************************************************",
1125 jwt, seed
1126 );
1127 Ok(creds)
1128 }
1129}
1130
1131pub(crate) fn load_certs(
1132 certificates: &str,
1133) -> ConnectorResult<Vec<rustls_pki_types::CertificateDer<'static>>> {
1134 let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") {
1135 std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
1136 } else {
1137 certificates.as_bytes().to_owned()
1138 };
1139
1140 CertificateDer::pem_slice_iter(&cert_bytes)
1141 .collect::<Result<Vec<_>, _>>()
1142 .context("Failed to parse certificates")
1143 .map_err(Into::into)
1144}
1145
1146pub(crate) fn load_private_key(
1147 certificate: &str,
1148) -> ConnectorResult<rustls_pki_types::PrivateKeyDer<'static>> {
1149 let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") {
1150 std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
1151 } else {
1152 certificate.as_bytes().to_owned()
1153 };
1154
1155 let cert = PrivatePkcs8KeyDer::pem_slice_iter(&cert_bytes)
1156 .next()
1157 .ok_or_else(|| anyhow!("No private key found"))?
1158 .context("Failed to parse private key")?;
1159 Ok(cert.into())
1160}
1161
1162#[serde_as]
1163#[derive(Deserialize, Debug, Clone, WithOptions)]
1164pub struct MongodbCommon {
1165 #[serde(rename = "mongodb.url")]
1167 pub connect_uri: String,
1168 #[serde(rename = "collection.name")]
1172 pub collection_name: String,
1173}
1174
1175impl EnforceSecret for MongodbCommon {
1176 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
1177 "mongodb.url"
1178 };
1179}
1180
1181impl MongodbCommon {
1182 pub(crate) async fn build_client(&self) -> ConnectorResult<mongodb::Client> {
1183 let client = mongodb::Client::with_uri_str(&self.connect_uri).await?;
1184
1185 Ok(client)
1186 }
1187}