risingwave_connector/source/filesystem/s3/
mod.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod enumerator;
16use phf::{Set, phf_set};
17use risingwave_common::util::env_var::env_var_is_true;
18use serde::Deserialize;
19
20use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
21use crate::deserialize_optional_bool_from_string;
22use crate::enforce_secret::EnforceSecret;
23use crate::source::SourceProperties;
24use crate::source::util::dummy::{
25    DummyProperties, DummySourceReader, DummySplit, DummySplitEnumerator,
26};
27
28/// Refer to [`crate::source::OPENDAL_S3_CONNECTOR`].
29pub const LEGACY_S3_CONNECTOR: &str = "s3";
30
31/// These are supported by both `s3` and `s3_v2` (opendal) sources.
32#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
33pub struct S3PropertiesCommon {
34    #[serde(rename = "s3.region_name")]
35    pub region_name: String,
36    #[serde(rename = "s3.bucket_name")]
37    pub bucket_name: String,
38    #[serde(rename = "match_pattern", default)]
39    pub match_pattern: Option<String>,
40    #[serde(rename = "s3.credentials.access", default)]
41    pub access: Option<String>,
42    #[serde(rename = "s3.credentials.secret", default)]
43    pub secret: Option<String>,
44    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
45    pub enable_config_load: Option<bool>,
46    #[serde(rename = "s3.endpoint_url")]
47    pub endpoint_url: Option<String>,
48}
49
50impl S3PropertiesCommon {
51    pub fn enable_config_load(&self) -> bool {
52        // If the env var is set to true, we disable the default config load. (Cloud environment)
53        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
54            return false;
55        }
56        self.enable_config_load.unwrap_or(false)
57    }
58}
59
60impl EnforceSecret for S3PropertiesCommon {
61    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
62        "s3.credentials.access",
63        "s3.credentials.secret",
64    };
65}
66
67#[derive(Debug, Clone, PartialEq)]
68pub struct LegacyS3;
69
70/// Note: legacy s3 source is fully deprecated since v2.4.0.
71/// The properties and enumerator are kept, so that meta can start normally.
72pub type LegacyS3Properties = DummyProperties<LegacyS3>;
73
74/// Note: legacy s3 source is fully deprecated since v2.4.0.
75/// The properties and enumerator are kept, so that meta can start normally.
76pub type LegacyS3SplitEnumerator = DummySplitEnumerator<LegacyS3>;
77
78pub type LegacyFsSplit = DummySplit<LegacyS3>;
79
80impl SourceProperties for LegacyS3Properties {
81    type Split = LegacyFsSplit;
82    type SplitEnumerator = LegacyS3SplitEnumerator;
83    type SplitReader = DummySourceReader<LegacyS3>;
84
85    const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR;
86}