1use std::collections::BTreeMap;
16use std::hash::Hash;
17use std::io::Write;
18use std::time::Duration;
19
20use anyhow::{Context, anyhow};
21use async_nats::jetstream::consumer::DeliverPolicy;
22use async_nats::jetstream::{self};
23use aws_sdk_kinesis::Client as KinesisClient;
24use aws_sdk_kinesis::config::{AsyncSleep, SharedAsyncSleep, Sleep};
25use phf::{Set, phf_set};
26use pulsar::authentication::oauth2::{OAuth2Authentication, OAuth2Params};
27use pulsar::{Authentication, Pulsar, TokioExecutor};
28use rdkafka::ClientConfig;
29use risingwave_common::bail;
30use serde_derive::Deserialize;
31use serde_with::json::JsonString;
32use serde_with::{DisplayFromStr, serde_as};
33use tempfile::NamedTempFile;
34use time::OffsetDateTime;
35use url::Url;
36use with_options::WithOptions;
37
38use crate::aws_utils::load_file_descriptor_from_s3;
39use crate::deserialize_duration_from_string;
40use crate::enforce_secret::EnforceSecret;
41use crate::error::ConnectorResult;
42use crate::sink::SinkError;
43use crate::source::nats::source::NatsOffset;
44
45pub const PRIVATE_LINK_BROKER_REWRITE_MAP_KEY: &str = "broker.rewrite.endpoints";
46pub const PRIVATE_LINK_TARGETS_KEY: &str = "privatelink.targets";
47
48const AWS_MSK_IAM_AUTH: &str = "AWS_MSK_IAM";
49
50pub const DISABLE_DEFAULT_CREDENTIAL: &str = "DISABLE_DEFAULT_CREDENTIAL";
53
54#[derive(Debug, Clone, Deserialize)]
55pub struct AwsPrivateLinkItem {
56 pub az_id: Option<String>,
57 pub port: u16,
58}
59
60use aws_config::default_provider::region::DefaultRegionChain;
61use aws_config::sts::AssumeRoleProvider;
62use aws_credential_types::provider::SharedCredentialsProvider;
63use aws_types::SdkConfig;
64use aws_types::region::Region;
65use risingwave_common::util::env_var::env_var_is_true;
66
67#[derive(Deserialize, Debug, Clone, WithOptions, PartialEq)]
69pub struct AwsAuthProps {
70 #[serde(rename = "aws.region", alias = "region", alias = "s3.region")]
71 pub region: Option<String>,
72
73 #[serde(
74 rename = "aws.endpoint_url",
75 alias = "endpoint_url",
76 alias = "endpoint",
77 alias = "s3.endpoint"
78 )]
79 pub endpoint: Option<String>,
80 #[serde(
81 rename = "aws.credentials.access_key_id",
82 alias = "access_key",
83 alias = "s3.access.key"
84 )]
85 pub access_key: Option<String>,
86 #[serde(
87 rename = "aws.credentials.secret_access_key",
88 alias = "secret_key",
89 alias = "s3.secret.key"
90 )]
91 pub secret_key: Option<String>,
92 #[serde(rename = "aws.credentials.session_token", alias = "session_token")]
93 pub session_token: Option<String>,
94 #[serde(rename = "aws.credentials.role.arn", alias = "arn")]
96 pub arn: Option<String>,
97 #[serde(rename = "aws.credentials.role.external_id", alias = "external_id")]
99 pub external_id: Option<String>,
100 #[serde(rename = "aws.profile", alias = "profile")]
101 pub profile: Option<String>,
102 #[serde(rename = "aws.msk.signer_timeout_sec")]
103 pub msk_signer_timeout_sec: Option<u64>,
104}
105
106impl EnforceSecret for AwsAuthProps {
107 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
108 "access_key",
109 "aws.credentials.access_key_id",
110 "s3.access.key",
111 "secret_key",
112 "aws.credentials.secret_access_key",
113 "s3.secret.key",
114 "session_token",
115 "aws.credentials.session_token",
116 };
117}
118
119impl AwsAuthProps {
120 async fn build_region(&self) -> ConnectorResult<Region> {
121 if let Some(region_name) = &self.region {
122 Ok(Region::new(region_name.clone()))
123 } else {
124 let mut region_chain = DefaultRegionChain::builder();
125 if let Some(profile_name) = &self.profile {
126 region_chain = region_chain.profile_name(profile_name);
127 }
128
129 Ok(region_chain
130 .build()
131 .region()
132 .await
133 .context("region should be provided")?)
134 }
135 }
136
137 async fn build_credential_provider(&self) -> ConnectorResult<SharedCredentialsProvider> {
138 if self.access_key.is_some() && self.secret_key.is_some() {
139 Ok(SharedCredentialsProvider::new(
140 aws_credential_types::Credentials::from_keys(
141 self.access_key.as_ref().unwrap(),
142 self.secret_key.as_ref().unwrap(),
143 self.session_token.clone(),
144 ),
145 ))
146 } else if !env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
147 Ok(SharedCredentialsProvider::new(
148 aws_config::default_provider::credentials::default_provider().await,
149 ))
150 } else {
151 bail!("Both \"access_key\" and \"secret_key\" are required.")
152 }
153 }
154
155 async fn with_role_provider(
156 &self,
157 credential: SharedCredentialsProvider,
158 ) -> ConnectorResult<SharedCredentialsProvider> {
159 if let Some(role_name) = &self.arn {
160 let region = self.build_region().await?;
161 let mut role = AssumeRoleProvider::builder(role_name)
162 .session_name("RisingWave")
163 .region(region);
164 if let Some(id) = &self.external_id {
165 role = role.external_id(id);
166 }
167 let provider = role.build_from_provider(credential).await;
168 Ok(SharedCredentialsProvider::new(provider))
169 } else {
170 Ok(credential)
171 }
172 }
173
174 pub async fn build_config(&self) -> ConnectorResult<SdkConfig> {
175 let region = self.build_region().await?;
176 let credentials_provider = self
177 .with_role_provider(self.build_credential_provider().await?)
178 .await?;
179 let mut config_loader = aws_config::from_env()
180 .region(region)
181 .credentials_provider(credentials_provider);
182
183 if let Some(endpoint) = self.endpoint.as_ref() {
184 config_loader = config_loader.endpoint_url(endpoint);
185 }
186
187 Ok(config_loader.load().await)
188 }
189}
190
191#[serde_as]
192#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
193pub struct KafkaConnectionProps {
194 #[serde(rename = "properties.bootstrap.server", alias = "kafka.brokers")]
195 pub brokers: String,
196
197 #[serde(rename = "properties.security.protocol")]
200 security_protocol: Option<String>,
201
202 #[serde(rename = "properties.ssl.endpoint.identification.algorithm")]
203 ssl_endpoint_identification_algorithm: Option<String>,
204
205 #[serde(rename = "properties.ssl.ca.location")]
208 ssl_ca_location: Option<String>,
209
210 #[serde(rename = "properties.ssl.ca.pem")]
212 ssl_ca_pem: Option<String>,
213
214 #[serde(rename = "properties.ssl.certificate.location")]
216 ssl_certificate_location: Option<String>,
217
218 #[serde(rename = "properties.ssl.certificate.pem")]
220 ssl_certificate_pem: Option<String>,
221
222 #[serde(rename = "properties.ssl.key.location")]
224 ssl_key_location: Option<String>,
225
226 #[serde(rename = "properties.ssl.key.pem")]
228 ssl_key_pem: Option<String>,
229
230 #[serde(rename = "properties.ssl.key.password")]
232 ssl_key_password: Option<String>,
233
234 #[serde(rename = "properties.sasl.mechanism")]
236 sasl_mechanism: Option<String>,
237
238 #[serde(rename = "properties.sasl.username")]
240 sasl_username: Option<String>,
241
242 #[serde(rename = "properties.sasl.password")]
244 sasl_password: Option<String>,
245
246 #[serde(rename = "properties.sasl.kerberos.service.name")]
248 sasl_kerberos_service_name: Option<String>,
249
250 #[serde(rename = "properties.sasl.kerberos.keytab")]
252 sasl_kerberos_keytab: Option<String>,
253
254 #[serde(rename = "properties.sasl.kerberos.principal")]
256 sasl_kerberos_principal: Option<String>,
257
258 #[serde(rename = "properties.sasl.kerberos.kinit.cmd")]
260 sasl_kerberos_kinit_cmd: Option<String>,
261
262 #[serde(rename = "properties.sasl.kerberos.min.time.before.relogin")]
264 sasl_kerberos_min_time_before_relogin: Option<String>,
265
266 #[serde(rename = "properties.sasl.oauthbearer.config")]
268 sasl_oathbearer_config: Option<String>,
269}
270
271impl EnforceSecret for KafkaConnectionProps {
272 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
273 "properties.ssl.key.pem",
274 "properties.ssl.key.password",
275 "properties.sasl.password",
276 };
277}
278
279#[serde_as]
280#[derive(Debug, Clone, Deserialize, WithOptions)]
281pub struct KafkaCommon {
282 #[serde(rename = "topic", alias = "kafka.topic")]
284 pub topic: String,
285
286 #[serde(
287 rename = "properties.sync.call.timeout",
288 deserialize_with = "deserialize_duration_from_string",
289 default = "default_kafka_sync_call_timeout"
290 )]
291 pub sync_call_timeout: Duration,
292}
293
294#[serde_as]
295#[derive(Debug, Clone, Deserialize, WithOptions, PartialEq, Hash, Eq)]
296pub struct KafkaPrivateLinkCommon {
297 #[serde(rename = "broker.rewrite.endpoints")]
299 #[serde_as(as = "Option<JsonString>")]
300 pub broker_rewrite_map: Option<BTreeMap<String, String>>,
301}
302
303const fn default_kafka_sync_call_timeout() -> Duration {
304 Duration::from_secs(5)
305}
306
307const fn default_socket_keepalive_enable() -> bool {
308 true
309}
310
311#[serde_as]
312#[derive(Debug, Clone, Deserialize, WithOptions)]
313pub struct RdKafkaPropertiesCommon {
314 #[serde(rename = "properties.message.max.bytes")]
319 #[serde_as(as = "Option<DisplayFromStr>")]
320 pub message_max_bytes: Option<usize>,
321
322 #[serde(rename = "properties.receive.message.max.bytes")]
327 #[serde_as(as = "Option<DisplayFromStr>")]
328 pub receive_message_max_bytes: Option<usize>,
329
330 #[serde(rename = "properties.statistics.interval.ms")]
331 #[serde_as(as = "Option<DisplayFromStr>")]
332 pub statistics_interval_ms: Option<usize>,
333
334 #[serde(rename = "properties.client.id")]
336 #[serde_as(as = "Option<DisplayFromStr>")]
337 pub client_id: Option<String>,
338
339 #[serde(rename = "properties.enable.ssl.certificate.verification")]
340 #[serde_as(as = "Option<DisplayFromStr>")]
341 pub enable_ssl_certificate_verification: Option<bool>,
342
343 #[serde(
344 rename = "properties.socket.keepalive.enable",
345 default = "default_socket_keepalive_enable"
346 )]
347 #[serde_as(as = "DisplayFromStr")]
348 pub socket_keepalive_enable: bool,
349}
350
351impl RdKafkaPropertiesCommon {
352 pub(crate) fn set_client(&self, c: &mut rdkafka::ClientConfig) {
353 if let Some(v) = self.statistics_interval_ms {
354 c.set("statistics.interval.ms", v.to_string());
355 }
356 if let Some(v) = self.message_max_bytes {
357 c.set("message.max.bytes", v.to_string());
358 }
359 if let Some(v) = self.receive_message_max_bytes {
360 c.set("receive.message.max.bytes", v.to_string());
361 }
362 if let Some(v) = self.client_id.as_ref() {
363 c.set("client.id", v);
364 }
365 if let Some(v) = self.enable_ssl_certificate_verification {
366 c.set("enable.ssl.certificate.verification", v.to_string());
367 }
368 c.set(
369 "socket.keepalive.enable",
370 self.socket_keepalive_enable.to_string(),
371 );
372 }
373}
374
375impl KafkaConnectionProps {
376 #[cfg(test)]
377 pub fn test_default() -> Self {
378 Self {
379 brokers: "localhost:9092".to_owned(),
380 security_protocol: None,
381 ssl_ca_location: None,
382 ssl_certificate_location: None,
383 ssl_key_location: None,
384 ssl_ca_pem: None,
385 ssl_certificate_pem: None,
386 ssl_key_pem: None,
387 ssl_key_password: None,
388 ssl_endpoint_identification_algorithm: None,
389 sasl_mechanism: None,
390 sasl_username: None,
391 sasl_password: None,
392 sasl_kerberos_service_name: None,
393 sasl_kerberos_keytab: None,
394 sasl_kerberos_principal: None,
395 sasl_kerberos_kinit_cmd: None,
396 sasl_kerberos_min_time_before_relogin: None,
397 sasl_oathbearer_config: None,
398 }
399 }
400
401 pub(crate) fn set_security_properties(&self, config: &mut ClientConfig) {
402 if self.is_aws_msk_iam() {
404 config.set("security.protocol", "SASL_SSL");
405 config.set("sasl.mechanism", "OAUTHBEARER");
406 return;
407 }
408
409 if let Some(security_protocol) = self.security_protocol.as_ref() {
411 config.set("security.protocol", security_protocol);
412 }
413
414 if let Some(ssl_ca_location) = self.ssl_ca_location.as_ref() {
416 config.set("ssl.ca.location", ssl_ca_location);
417 }
418 if let Some(ssl_ca_pem) = self.ssl_ca_pem.as_ref() {
419 config.set("ssl.ca.pem", ssl_ca_pem);
420 }
421 if let Some(ssl_certificate_location) = self.ssl_certificate_location.as_ref() {
422 config.set("ssl.certificate.location", ssl_certificate_location);
423 }
424 if let Some(ssl_certificate_pem) = self.ssl_certificate_pem.as_ref() {
425 config.set("ssl.certificate.pem", ssl_certificate_pem);
426 }
427 if let Some(ssl_key_location) = self.ssl_key_location.as_ref() {
428 config.set("ssl.key.location", ssl_key_location);
429 }
430 if let Some(ssl_key_pem) = self.ssl_key_pem.as_ref() {
431 config.set("ssl.key.pem", ssl_key_pem);
432 }
433 if let Some(ssl_key_password) = self.ssl_key_password.as_ref() {
434 config.set("ssl.key.password", ssl_key_password);
435 }
436 if let Some(ssl_endpoint_identification_algorithm) =
437 self.ssl_endpoint_identification_algorithm.as_ref()
438 {
439 config.set(
441 "ssl.endpoint.identification.algorithm",
442 ssl_endpoint_identification_algorithm,
443 );
444 }
445
446 if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref() {
448 config.set("sasl.mechanism", sasl_mechanism);
449 }
450
451 if let Some(sasl_username) = self.sasl_username.as_ref() {
453 config.set("sasl.username", sasl_username);
454 }
455 if let Some(sasl_password) = self.sasl_password.as_ref() {
456 config.set("sasl.password", sasl_password);
457 }
458
459 if let Some(sasl_kerberos_service_name) = self.sasl_kerberos_service_name.as_ref() {
461 config.set("sasl.kerberos.service.name", sasl_kerberos_service_name);
462 }
463 if let Some(sasl_kerberos_keytab) = self.sasl_kerberos_keytab.as_ref() {
464 config.set("sasl.kerberos.keytab", sasl_kerberos_keytab);
465 }
466 if let Some(sasl_kerberos_principal) = self.sasl_kerberos_principal.as_ref() {
467 config.set("sasl.kerberos.principal", sasl_kerberos_principal);
468 }
469 if let Some(sasl_kerberos_kinit_cmd) = self.sasl_kerberos_kinit_cmd.as_ref() {
470 config.set("sasl.kerberos.kinit.cmd", sasl_kerberos_kinit_cmd);
471 }
472 if let Some(sasl_kerberos_min_time_before_relogin) =
473 self.sasl_kerberos_min_time_before_relogin.as_ref()
474 {
475 config.set(
476 "sasl.kerberos.min.time.before.relogin",
477 sasl_kerberos_min_time_before_relogin,
478 );
479 }
480
481 if let Some(sasl_oathbearer_config) = self.sasl_oathbearer_config.as_ref() {
483 config.set("sasl.oauthbearer.config", sasl_oathbearer_config);
484 }
485 config.set("enable.sasl.oauthbearer.unsecure.jwt", "true");
487 }
488
489 pub(crate) fn is_aws_msk_iam(&self) -> bool {
490 if let Some(sasl_mechanism) = self.sasl_mechanism.as_ref()
491 && sasl_mechanism == AWS_MSK_IAM_AUTH
492 {
493 true
494 } else {
495 false
496 }
497 }
498}
499
500#[derive(Clone, Debug, Deserialize, WithOptions)]
501pub struct PulsarCommon {
502 #[serde(rename = "topic", alias = "pulsar.topic")]
503 pub topic: String,
504
505 #[serde(rename = "service.url", alias = "pulsar.service.url")]
506 pub service_url: String,
507
508 #[serde(rename = "auth.token")]
509 pub auth_token: Option<String>,
510}
511
512impl EnforceSecret for PulsarCommon {
513 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
514 "pulsar.auth.token",
515 };
516}
517
518#[derive(Clone, Debug, Deserialize, WithOptions)]
519pub struct PulsarOauthCommon {
520 #[serde(rename = "oauth.issuer.url")]
521 pub issuer_url: String,
522
523 #[serde(rename = "oauth.credentials.url")]
524 pub credentials_url: String,
525
526 #[serde(rename = "oauth.audience")]
527 pub audience: String,
528
529 #[serde(rename = "oauth.scope")]
530 pub scope: Option<String>,
531}
532
533fn create_credential_temp_file(credentials: &[u8]) -> std::io::Result<NamedTempFile> {
534 let mut f = NamedTempFile::new()?;
535 f.write_all(credentials)?;
536 f.as_file().sync_all()?;
537 Ok(f)
538}
539
540impl PulsarCommon {
541 pub(crate) async fn build_client(
542 &self,
543 oauth: &Option<PulsarOauthCommon>,
544 aws_auth_props: &AwsAuthProps,
545 ) -> ConnectorResult<Pulsar<TokioExecutor>> {
546 let mut pulsar_builder = Pulsar::builder(&self.service_url, TokioExecutor);
547 let mut temp_file = None;
548 if let Some(oauth) = oauth.as_ref() {
549 let url = Url::parse(&oauth.credentials_url)?;
550 match url.scheme() {
551 "s3" => {
552 let credentials = load_file_descriptor_from_s3(&url, aws_auth_props).await?;
553 temp_file = Some(
554 create_credential_temp_file(&credentials)
555 .context("failed to create temp file for pulsar credentials")?,
556 );
557 }
558 "file" => {}
559 _ => {
560 bail!("invalid credentials_url, only file url and s3 url are supported",);
561 }
562 }
563
564 let auth_params = OAuth2Params {
565 issuer_url: oauth.issuer_url.clone(),
566 credentials_url: if temp_file.is_none() {
567 oauth.credentials_url.clone()
568 } else {
569 let mut raw_path = temp_file
570 .as_ref()
571 .unwrap()
572 .path()
573 .to_str()
574 .unwrap()
575 .to_owned();
576 raw_path.insert_str(0, "file://");
577 raw_path
578 },
579 audience: Some(oauth.audience.clone()),
580 scope: oauth.scope.clone(),
581 };
582
583 pulsar_builder = pulsar_builder
584 .with_auth_provider(OAuth2Authentication::client_credentials(auth_params));
585 } else if let Some(auth_token) = &self.auth_token {
586 pulsar_builder = pulsar_builder.with_auth(Authentication {
587 name: "token".to_owned(),
588 data: Vec::from(auth_token.as_str()),
589 });
590 }
591
592 let res = pulsar_builder.build().await.map_err(|e| anyhow!(e))?;
593 drop(temp_file);
594 Ok(res)
595 }
596}
597
598#[serde_as]
599#[derive(Deserialize, Debug, Clone, WithOptions)]
600pub struct KinesisCommon {
601 #[serde(rename = "stream", alias = "kinesis.stream.name")]
602 pub stream_name: String,
603 #[serde(rename = "aws.region", alias = "kinesis.stream.region")]
604 pub stream_region: String,
605 #[serde(rename = "endpoint", alias = "kinesis.endpoint")]
606 pub endpoint: Option<String>,
607 #[serde(
608 rename = "aws.credentials.access_key_id",
609 alias = "kinesis.credentials.access"
610 )]
611 pub credentials_access_key: Option<String>,
612 #[serde(
613 rename = "aws.credentials.secret_access_key",
614 alias = "kinesis.credentials.secret"
615 )]
616 pub credentials_secret_access_key: Option<String>,
617 #[serde(
618 rename = "aws.credentials.session_token",
619 alias = "kinesis.credentials.session_token"
620 )]
621 pub session_token: Option<String>,
622 #[serde(rename = "aws.credentials.role.arn", alias = "kinesis.assumerole.arn")]
623 pub assume_role_arn: Option<String>,
624 #[serde(
625 rename = "aws.credentials.role.external_id",
626 alias = "kinesis.assumerole.external_id"
627 )]
628 pub assume_role_external_id: Option<String>,
629
630 #[serde(
632 rename = "kinesis.sdk.connect_timeout_ms",
633 default = "kinesis_default_connect_timeout_ms"
634 )]
635 #[serde_as(as = "DisplayFromStr")]
636 pub sdk_connect_timeout_ms: u64,
637
638 #[serde(
639 rename = "kinesis.sdk.read_timeout_ms",
640 default = "kinesis_default_read_timeout_ms"
641 )]
642 #[serde_as(as = "DisplayFromStr")]
643 pub sdk_read_timeout_ms: u64,
644
645 #[serde(
646 rename = "kinesis.sdk.operation_timeout_ms",
647 default = "kinesis_default_operation_timeout_ms"
648 )]
649 #[serde_as(as = "DisplayFromStr")]
650 pub sdk_operation_timeout_ms: u64,
651
652 #[serde(
653 rename = "kinesis.sdk.operation_attempt_timeout_ms",
654 default = "kinesis_default_operation_attempt_timeout_ms"
655 )]
656 #[serde_as(as = "DisplayFromStr")]
657 pub sdk_operation_attempt_timeout_ms: u64,
658
659 #[serde(
660 rename = "kinesis.sdk.max_retry_limit",
661 default = "kinesis_default_max_retry_limit"
662 )]
663 #[serde_as(as = "DisplayFromStr")]
664 pub sdk_max_retry_limit: u32,
665
666 #[serde(
667 rename = "kinesis.sdk.init_backoff_ms",
668 default = "kinesis_default_init_backoff_ms"
669 )]
670 #[serde_as(as = "DisplayFromStr")]
671 pub sdk_init_backoff_ms: u64,
672
673 #[serde(
674 rename = "kinesis.sdk.max_backoff_ms",
675 default = "kinesis_default_max_backoff_ms"
676 )]
677 #[serde_as(as = "DisplayFromStr")]
678 pub sdk_max_backoff_ms: u64,
679}
680
681#[derive(Debug)]
682pub struct KinesisAsyncSleepImpl;
683
684impl AsyncSleep for KinesisAsyncSleepImpl {
685 fn sleep(&self, duration: Duration) -> Sleep {
686 Sleep::new(async move { tokio::time::sleep(duration).await })
687 }
688}
689
690const fn kinesis_default_connect_timeout_ms() -> u64 {
691 10000
692}
693
694const fn kinesis_default_read_timeout_ms() -> u64 {
695 10000
696}
697
698const fn kinesis_default_operation_timeout_ms() -> u64 {
699 10000
700}
701
702const fn kinesis_default_operation_attempt_timeout_ms() -> u64 {
703 10000
704}
705
706const fn kinesis_default_init_backoff_ms() -> u64 {
707 1000
708}
709
710const fn kinesis_default_max_backoff_ms() -> u64 {
711 20000
712}
713
714const fn kinesis_default_max_retry_limit() -> u32 {
715 3
716}
717
718impl EnforceSecret for KinesisCommon {
719 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
720 "kinesis.credentials.access",
721 "kinesis.credentials.secret",
722 "kinesis.credentials.session_token",
723 };
724}
725
726impl KinesisCommon {
727 pub(crate) async fn build_client(&self) -> ConnectorResult<KinesisClient> {
728 let config = AwsAuthProps {
729 region: Some(self.stream_region.clone()),
730 endpoint: self.endpoint.clone(),
731 access_key: self.credentials_access_key.clone(),
732 secret_key: self.credentials_secret_access_key.clone(),
733 session_token: self.session_token.clone(),
734 arn: self.assume_role_arn.clone(),
735 external_id: self.assume_role_external_id.clone(),
736 profile: Default::default(),
737 msk_signer_timeout_sec: Default::default(),
738 };
739 let aws_config = config.build_config().await?;
740 let mut builder = aws_sdk_kinesis::config::Builder::from(&aws_config);
741 {
742 let sleep_impl = SharedAsyncSleep::new(KinesisAsyncSleepImpl);
744 builder.set_sleep_impl(Some(sleep_impl));
745 let timeout_config = aws_smithy_types::timeout::TimeoutConfig::builder()
746 .connect_timeout(Duration::from_millis(self.sdk_connect_timeout_ms))
747 .read_timeout(Duration::from_millis(self.sdk_read_timeout_ms))
748 .operation_timeout(Duration::from_millis(self.sdk_operation_timeout_ms))
749 .operation_attempt_timeout(Duration::from_millis(
750 self.sdk_operation_attempt_timeout_ms,
751 ))
752 .build();
753 builder.set_timeout_config(Some(timeout_config));
754
755 let retry_config = aws_smithy_types::retry::RetryConfig::standard()
756 .with_initial_backoff(Duration::from_millis(self.sdk_init_backoff_ms))
757 .with_max_backoff(Duration::from_millis(self.sdk_max_backoff_ms))
758 .with_max_attempts(self.sdk_max_retry_limit);
759 builder.set_retry_config(Some(retry_config));
760 }
761 if let Some(endpoint) = &config.endpoint {
762 builder = builder.endpoint_url(endpoint);
763 }
764 Ok(KinesisClient::from_conf(builder.build()))
765 }
766}
767
768#[serde_as]
769#[derive(Deserialize, Debug, Clone, WithOptions)]
770pub struct NatsCommon {
771 #[serde(rename = "server_url")]
772 pub server_url: String,
773 #[serde(rename = "subject")]
774 pub subject: String,
775 #[serde(rename = "connect_mode")]
776 pub connect_mode: String,
777 #[serde(rename = "username")]
778 pub user: Option<String>,
779 #[serde(rename = "password")]
780 pub password: Option<String>,
781 #[serde(rename = "jwt")]
782 pub jwt: Option<String>,
783 #[serde(rename = "nkey")]
784 pub nkey: Option<String>,
785 #[serde(rename = "max_bytes")]
786 #[serde_as(as = "Option<DisplayFromStr>")]
787 pub max_bytes: Option<i64>,
788 #[serde(rename = "max_messages")]
789 #[serde_as(as = "Option<DisplayFromStr>")]
790 pub max_messages: Option<i64>,
791 #[serde(rename = "max_messages_per_subject")]
792 #[serde_as(as = "Option<DisplayFromStr>")]
793 pub max_messages_per_subject: Option<i64>,
794 #[serde(rename = "max_consumers")]
795 #[serde_as(as = "Option<DisplayFromStr>")]
796 pub max_consumers: Option<i32>,
797 #[serde(rename = "max_message_size")]
798 #[serde_as(as = "Option<DisplayFromStr>")]
799 pub max_message_size: Option<i32>,
800}
801
802impl EnforceSecret for NatsCommon {
803 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
804 "password",
805 "jwt",
806 "nkey",
807 };
808}
809
810impl NatsCommon {
811 pub(crate) async fn build_client(&self) -> ConnectorResult<async_nats::Client> {
812 let mut connect_options = async_nats::ConnectOptions::new();
813 match self.connect_mode.as_str() {
814 "user_and_password" => {
815 if let (Some(v_user), Some(v_password)) =
816 (self.user.as_ref(), self.password.as_ref())
817 {
818 connect_options =
819 connect_options.user_and_password(v_user.into(), v_password.into())
820 } else {
821 bail!("nats connect mode is user_and_password, but user or password is empty");
822 }
823 }
824
825 "credential" => {
826 if let (Some(v_nkey), Some(v_jwt)) = (self.nkey.as_ref(), self.jwt.as_ref()) {
827 connect_options = connect_options
828 .credentials(&self.create_credential(v_nkey, v_jwt)?)
829 .expect("failed to parse static creds")
830 } else {
831 bail!("nats connect mode is credential, but nkey or jwt is empty");
832 }
833 }
834 "plain" => {}
835 _ => {
836 bail!("nats connect mode only accepts user_and_password/credential/plain");
837 }
838 };
839
840 let servers = self.server_url.split(',').collect::<Vec<&str>>();
841 let client = connect_options
842 .connect(
843 servers
844 .iter()
845 .map(|url| url.parse())
846 .collect::<Result<Vec<async_nats::ServerAddr>, _>>()?,
847 )
848 .await
849 .context("build nats client error")
850 .map_err(SinkError::Nats)?;
851 Ok(client)
852 }
853
854 pub(crate) async fn build_context(&self) -> ConnectorResult<jetstream::Context> {
855 let client = self.build_client().await?;
856 let jetstream = async_nats::jetstream::new(client);
857 Ok(jetstream)
858 }
859
860 pub(crate) async fn build_consumer(
861 &self,
862 stream: String,
863 durable_consumer_name: String,
864 split_id: String,
865 start_sequence: NatsOffset,
866 mut config: jetstream::consumer::pull::Config,
867 ) -> ConnectorResult<
868 async_nats::jetstream::consumer::Consumer<async_nats::jetstream::consumer::pull::Config>,
869 > {
870 let context = self.build_context().await?;
871 let stream = self.build_or_get_stream(context.clone(), stream).await?;
872 let subject_name = self
873 .subject
874 .replace(',', "-")
875 .replace(['.', '>', '*', ' ', '\t'], "_");
876 let name = format!("risingwave-consumer-{}-{}", subject_name, split_id);
877
878 let deliver_policy = match start_sequence {
879 NatsOffset::Earliest => DeliverPolicy::All,
880 NatsOffset::Latest => DeliverPolicy::New,
881 NatsOffset::SequenceNumber(v) => {
882 let parsed = v
884 .parse::<u64>()
885 .context("failed to parse nats offset as sequence number")?;
886 DeliverPolicy::ByStartSequence {
887 start_sequence: 1 + parsed,
888 }
889 }
890 NatsOffset::Timestamp(v) => DeliverPolicy::ByStartTime {
891 start_time: OffsetDateTime::from_unix_timestamp_nanos(v as i128 * 1_000_000)
892 .context("invalid timestamp for nats offset")?,
893 },
894 NatsOffset::None => DeliverPolicy::All,
895 };
896
897 let consumer = match stream.get_consumer(&name).await {
898 Ok(consumer) => consumer,
899 _ => {
900 stream
901 .get_or_create_consumer(&name, {
902 config.deliver_policy = deliver_policy;
903 config.durable_name = Some(durable_consumer_name);
904 config.filter_subjects =
905 self.subject.split(',').map(|s| s.to_owned()).collect();
906 config
907 })
908 .await?
909 }
910 };
911 Ok(consumer)
912 }
913
914 pub(crate) async fn build_or_get_stream(
915 &self,
916 jetstream: jetstream::Context,
917 stream: String,
918 ) -> ConnectorResult<jetstream::stream::Stream> {
919 let subjects: Vec<String> = self.subject.split(',').map(|s| s.to_owned()).collect();
920 if let Ok(mut stream_instance) = jetstream.get_stream(&stream).await {
921 tracing::info!(
922 "load existing nats stream ({:?}) with config {:?}",
923 stream,
924 stream_instance.info().await?
925 );
926 return Ok(stream_instance);
927 }
928
929 let mut config = jetstream::stream::Config {
930 name: stream.clone(),
931 max_bytes: 1000000,
932 subjects,
933 ..Default::default()
934 };
935 if let Some(v) = self.max_bytes {
936 config.max_bytes = v;
937 }
938 if let Some(v) = self.max_messages {
939 config.max_messages = v;
940 }
941 if let Some(v) = self.max_messages_per_subject {
942 config.max_messages_per_subject = v;
943 }
944 if let Some(v) = self.max_consumers {
945 config.max_consumers = v;
946 }
947 if let Some(v) = self.max_message_size {
948 config.max_message_size = v;
949 }
950 tracing::info!(
951 "create nats stream ({:?}) with config {:?}",
952 &stream,
953 config
954 );
955 let stream = jetstream.get_or_create_stream(config).await?;
956 Ok(stream)
957 }
958
959 pub(crate) fn create_credential(&self, seed: &str, jwt: &str) -> ConnectorResult<String> {
960 let creds = format!(
961 "-----BEGIN NATS USER JWT-----\n{}\n------END NATS USER JWT------\n\n\
962 ************************* IMPORTANT *************************\n\
963 NKEY Seed printed below can be used to sign and prove identity.\n\
964 NKEYs are sensitive and should be treated as secrets.\n\n\
965 -----BEGIN USER NKEY SEED-----\n{}\n------END USER NKEY SEED------\n\n\
966 *************************************************************",
967 jwt, seed
968 );
969 Ok(creds)
970 }
971}
972
973pub(crate) fn load_certs(
974 certificates: &str,
975) -> ConnectorResult<Vec<rustls_pki_types::CertificateDer<'static>>> {
976 let cert_bytes = if let Some(path) = certificates.strip_prefix("fs://") {
977 std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
978 } else {
979 certificates.as_bytes().to_owned()
980 };
981
982 rustls_pemfile::certs(&mut cert_bytes.as_slice())
983 .map(|cert| Ok(cert?))
984 .collect()
985}
986
987pub(crate) fn load_private_key(
988 certificate: &str,
989) -> ConnectorResult<rustls_pki_types::PrivateKeyDer<'static>> {
990 let cert_bytes = if let Some(path) = certificate.strip_prefix("fs://") {
991 std::fs::read_to_string(path).map(|cert| cert.as_bytes().to_owned())?
992 } else {
993 certificate.as_bytes().to_owned()
994 };
995
996 let cert = rustls_pemfile::pkcs8_private_keys(&mut cert_bytes.as_slice())
997 .next()
998 .ok_or_else(|| anyhow!("No private key found"))?;
999 Ok(cert?.into())
1000}
1001
1002#[serde_as]
1003#[derive(Deserialize, Debug, Clone, WithOptions)]
1004pub struct MongodbCommon {
1005 #[serde(rename = "mongodb.url")]
1007 pub connect_uri: String,
1008 #[serde(rename = "collection.name")]
1012 pub collection_name: String,
1013}
1014
1015impl EnforceSecret for MongodbCommon {
1016 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
1017 "mongodb.url"
1018 };
1019}
1020
1021impl MongodbCommon {
1022 pub(crate) async fn build_client(&self) -> ConnectorResult<mongodb::Client> {
1023 let client = mongodb::Client::with_uri_str(&self.connect_uri).await?;
1024
1025 Ok(client)
1026 }
1027}