risingwave_connector/source/filesystem/opendal_source/
s3_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21
22use super::OpendalSource;
23use super::opendal_enumerator::OpendalEnumerator;
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::S3PropertiesCommon;
26use crate::source::filesystem::s3::enumerator::get_prefix;
27
28impl<Src: OpendalSource> OpendalEnumerator<Src> {
29    /// create opendal s3 source.
30    pub fn new_s3_source(
31        s3_properties: S3PropertiesCommon,
32        assume_role: Option<String>,
33    ) -> ConnectorResult<Self> {
34        // Create s3 builder.
35        let mut builder = S3::default()
36            .bucket(&s3_properties.bucket_name)
37            .region(&s3_properties.region_name);
38
39        if let Some(endpoint_url) = s3_properties.endpoint_url {
40            builder = builder.endpoint(&endpoint_url);
41        }
42
43        if let Some(access) = s3_properties.access {
44            builder = builder.access_key_id(&access);
45        } else {
46            tracing::error!(
47                "access key id of aws s3 is not set, bucket {}",
48                s3_properties.bucket_name
49            );
50        }
51
52        if let Some(secret) = s3_properties.secret {
53            builder = builder.secret_access_key(&secret);
54        } else {
55            tracing::error!(
56                "secret access key of aws s3 is not set, bucket {}",
57                s3_properties.bucket_name
58            );
59        }
60
61        if let Some(assume_role) = assume_role {
62            builder = builder.role_arn(&assume_role);
63        }
64
65        builder = builder.disable_config_load();
66        let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() {
67            let prefix = get_prefix(pattern);
68            let matcher = glob::Pattern::new(pattern)
69                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
70            (Some(prefix), Some(matcher))
71        } else {
72            (None, None)
73        };
74        let compression_format = s3_properties.compression_format;
75        let op: Operator = Operator::new(builder)?
76            .layer(LoggingLayer::default())
77            .layer(RetryLayer::default())
78            .finish();
79
80        Ok(Self {
81            op,
82            prefix,
83            matcher,
84            marker: PhantomData,
85            compression_format,
86        })
87    }
88}