risingwave_connector/source/filesystem/opendal_source/
s3_source.rsuse std::marker::PhantomData;
use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::S3;
use opendal::Operator;
use super::opendal_enumerator::OpendalEnumerator;
use super::OpendalSource;
use crate::error::ConnectorResult;
use crate::source::filesystem::s3::enumerator::get_prefix;
use crate::source::filesystem::s3::S3PropertiesCommon;
impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub fn new_s3_source(
s3_properties: S3PropertiesCommon,
assume_role: Option<String>,
) -> ConnectorResult<Self> {
let mut builder = S3::default()
.bucket(&s3_properties.bucket_name)
.region(&s3_properties.region_name);
if let Some(endpoint_url) = s3_properties.endpoint_url {
builder = builder.endpoint(&endpoint_url);
}
if let Some(access) = s3_properties.access {
builder = builder.access_key_id(&access);
} else {
tracing::error!(
"access key id of aws s3 is not set, bucket {}",
s3_properties.bucket_name
);
}
if let Some(secret) = s3_properties.secret {
builder = builder.secret_access_key(&secret);
} else {
tracing::error!(
"secret access key of aws s3 is not set, bucket {}",
s3_properties.bucket_name
);
}
if let Some(assume_role) = assume_role {
builder = builder.role_arn(&assume_role);
}
builder = builder.disable_config_load();
let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};
let compression_format = s3_properties.compression_format;
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
compression_format,
})
}
}