risingwave_connector/source/filesystem/s3/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14pub mod enumerator;
15use phf::{Set, phf_set};
16use risingwave_common::util::env_var::env_var_is_true;
17use serde::Deserialize;
18
19use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
20use crate::deserialize_optional_bool_from_string;
21use crate::enforce_secret::EnforceSecret;
22use crate::source::SourceProperties;
23use crate::source::util::dummy::{
24    DummyProperties, DummySourceReader, DummySplit, DummySplitEnumerator,
25};
26
27/// Refer to [`crate::source::OPENDAL_S3_CONNECTOR`].
28pub const LEGACY_S3_CONNECTOR: &str = "s3";
29
30/// These are supported by both `s3` and `s3_v2` (opendal) sources.
31#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
32pub struct S3PropertiesCommon {
33    #[serde(rename = "s3.region_name")]
34    pub region_name: String,
35    #[serde(rename = "s3.bucket_name")]
36    pub bucket_name: String,
37    #[serde(rename = "match_pattern", default)]
38    pub match_pattern: Option<String>,
39    #[serde(rename = "s3.credentials.access", default)]
40    pub access: Option<String>,
41    #[serde(rename = "s3.credentials.secret", default)]
42    pub secret: Option<String>,
43    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
44    pub enable_config_load: Option<bool>,
45    #[serde(rename = "s3.endpoint_url")]
46    pub endpoint_url: Option<String>,
47}
48
49impl S3PropertiesCommon {
50    pub fn enable_config_load(&self) -> bool {
51        // If the env var is set to true, we disable the default config load. (Cloud environment)
52        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
53            return false;
54        }
55        self.enable_config_load.unwrap_or(false)
56    }
57}
58
59impl EnforceSecret for S3PropertiesCommon {
60    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
61        "s3.credentials.access",
62        "s3.credentials.secret",
63    };
64}
65
66#[derive(Debug, Clone, PartialEq)]
67pub struct LegacyS3;
68
69/// Note: legacy s3 source is fully deprecated since v2.4.0.
70/// The properties and enumerator are kept, so that meta can start normally.
71pub type LegacyS3Properties = DummyProperties<LegacyS3>;
72
73/// Note: legacy s3 source is fully deprecated since v2.4.0.
74/// The properties and enumerator are kept, so that meta can start normally.
75pub type LegacyS3SplitEnumerator = DummySplitEnumerator<LegacyS3>;
76
77pub type LegacyFsSplit = DummySplit<LegacyS3>;
78
79impl SourceProperties for LegacyS3Properties {
80    type Split = LegacyFsSplit;
81    type SplitEnumerator = LegacyS3SplitEnumerator;
82    type SplitReader = DummySourceReader<LegacyS3>;
83
84    const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR;
85}