risingwave_connector/source/filesystem/opendal_source/
s3_source.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21
22use super::OpendalSource;
23use super::opendal_enumerator::OpendalEnumerator;
24use crate::error::ConnectorResult;
25use crate::source::filesystem::file_common::CompressionFormat;
26use crate::source::filesystem::s3::S3PropertiesCommon;
27use crate::source::filesystem::s3::enumerator::get_prefix;
28
29impl<Src: OpendalSource> OpendalEnumerator<Src> {
30 pub fn new_s3_source(
32 s3_properties: &S3PropertiesCommon,
33 assume_role: Option<String>,
34 compression_format: CompressionFormat,
35 ) -> ConnectorResult<Self> {
36 let mut builder = S3::default()
38 .bucket(&s3_properties.bucket_name)
39 .region(&s3_properties.region_name);
40
41 if let Some(endpoint_url) = &s3_properties.endpoint_url {
42 builder = builder.endpoint(endpoint_url);
43 }
44
45 if let Some(access) = &s3_properties.access {
46 builder = builder.access_key_id(access);
47 }
48
49 if let Some(secret) = &s3_properties.secret {
50 builder = builder.secret_access_key(secret);
51 }
52
53 if let Some(assume_role) = assume_role {
54 builder = builder.role_arn(&assume_role);
55 }
56
57 let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() {
58 let prefix = get_prefix(pattern);
59 let matcher = glob::Pattern::new(pattern)
60 .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
61 (Some(prefix), Some(matcher))
62 } else {
63 (None, None)
64 };
65
66 if !s3_properties.enable_config_load() {
68 builder = builder.disable_config_load();
69 }
70 let op: Operator = Operator::new(builder)?
71 .layer(LoggingLayer::default())
72 .layer(RetryLayer::default())
73 .finish();
74
75 Ok(Self {
76 op,
77 prefix,
78 matcher,
79 marker: PhantomData,
80 compression_format,
81 })
82 }
83}