risingwave_connector/source/filesystem/opendal_source/
s3_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21
22use super::OpendalSource;
23use super::opendal_enumerator::OpendalEnumerator;
24use crate::error::ConnectorResult;
25use crate::source::filesystem::file_common::CompressionFormat;
26use crate::source::filesystem::s3::S3PropertiesCommon;
27use crate::source::filesystem::s3::enumerator::get_prefix;
28
29impl<Src: OpendalSource> OpendalEnumerator<Src> {
30    /// create opendal s3 source.
31    pub fn new_s3_source(
32        s3_properties: &S3PropertiesCommon,
33        assume_role: Option<String>,
34        compression_format: CompressionFormat,
35    ) -> ConnectorResult<Self> {
36        // Create s3 builder.
37        let mut builder = S3::default()
38            .bucket(&s3_properties.bucket_name)
39            .region(&s3_properties.region_name);
40
41        if let Some(endpoint_url) = &s3_properties.endpoint_url {
42            builder = builder.endpoint(endpoint_url);
43        }
44
45        if let Some(access) = &s3_properties.access {
46            builder = builder.access_key_id(access);
47        }
48
49        if let Some(secret) = &s3_properties.secret {
50            builder = builder.secret_access_key(secret);
51        }
52
53        if let Some(assume_role) = assume_role {
54            builder = builder.role_arn(&assume_role);
55        }
56
57        let (prefix, matcher) = if let Some(pattern) = s3_properties.match_pattern.as_ref() {
58            let prefix = get_prefix(pattern);
59            let matcher = glob::Pattern::new(pattern)
60                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
61            (Some(prefix), Some(matcher))
62        } else {
63            (None, None)
64        };
65
66        // Default behavior is disable loading config from environment.
67        if !s3_properties.enable_config_load() {
68            builder = builder.disable_config_load();
69        }
70        let op: Operator = Operator::new(builder)?
71            .layer(LoggingLayer::default())
72            .layer(RetryLayer::default())
73            .finish();
74
75        Ok(Self {
76            op,
77            prefix,
78            matcher,
79            marker: PhantomData,
80            compression_format,
81        })
82    }
83}