risingwave_connector/source/filesystem/opendal_source/
s3_source.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21
22use super::OpendalSource;
23use super::opendal_enumerator::OpendalEnumerator;
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::S3PropertiesCommon;
26use crate::source::filesystem::s3::enumerator::get_prefix;
27
28impl<Src: OpendalSource> OpendalEnumerator<Src> {
29 pub fn new_s3_source(
31 s3_properties: S3PropertiesCommon,
32 assume_role: Option<String>,
33 ) -> ConnectorResult<Self> {
34 let mut builder = S3::default()
36 .bucket(&s3_properties.bucket_name)
37 .region(&s3_properties.region_name);
38
39 if let Some(endpoint_url) = s3_properties.endpoint_url {
40 builder = builder.endpoint(&endpoint_url);
41 }
42
43 if let Some(access) = s3_properties.access {
44 builder = builder.access_key_id(&access);
45 } else {
46 tracing::error!(
47 "access key id of aws s3 is not set, bucket {}",
48 s3_properties.bucket_name
49 );
50 }
51
52 if let Some(secret) = s3_properties.secret {
53 builder = builder.secret_access_key(&secret);
54 } else {
55 tracing::error!(
56 "secret access key of aws s3 is not set, bucket {}",
57 s3_properties.bucket_name
58 );
59 }
60
61 if let Some(assume_role) = assume_role {
62 builder = builder.role_arn(&assume_role);
63 }
64
65 builder = builder.disable_config_load();
66 let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() {
67 let prefix = get_prefix(pattern);
68 let matcher = glob::Pattern::new(pattern)
69 .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
70 (Some(prefix), Some(matcher))
71 } else {
72 (None, None)
73 };
74 let compression_format = s3_properties.compression_format;
75 let op: Operator = Operator::new(builder)?
76 .layer(LoggingLayer::default())
77 .layer(RetryLayer::default())
78 .finish();
79
80 Ok(Self {
81 op,
82 prefix,
83 matcher,
84 marker: PhantomData,
85 compression_format,
86 })
87 }
88}