risingwave_connector/source/filesystem/s3/
mod.rs1pub mod enumerator;
16use phf::{Set, phf_set};
17use risingwave_common::util::env_var::env_var_is_true;
18use serde::Deserialize;
19
20use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
21use crate::deserialize_optional_bool_from_string;
22use crate::enforce_secret::EnforceSecret;
23use crate::source::SourceProperties;
24use crate::source::util::dummy::{
25 DummyProperties, DummySourceReader, DummySplit, DummySplitEnumerator,
26};
27
28pub const LEGACY_S3_CONNECTOR: &str = "s3";
30
31#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
33pub struct S3PropertiesCommon {
34 #[serde(rename = "s3.region_name")]
35 pub region_name: String,
36 #[serde(rename = "s3.bucket_name")]
37 pub bucket_name: String,
38 #[serde(rename = "match_pattern", default)]
39 pub match_pattern: Option<String>,
40 #[serde(rename = "s3.credentials.access", default)]
41 pub access: Option<String>,
42 #[serde(rename = "s3.credentials.secret", default)]
43 pub secret: Option<String>,
44 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
45 pub enable_config_load: Option<bool>,
46 #[serde(rename = "s3.endpoint_url")]
47 pub endpoint_url: Option<String>,
48}
49
50impl S3PropertiesCommon {
51 pub fn enable_config_load(&self) -> bool {
52 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
54 return false;
55 }
56 self.enable_config_load.unwrap_or(false)
57 }
58}
59
60impl EnforceSecret for S3PropertiesCommon {
61 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
62 "s3.credentials.access",
63 "s3.credentials.secret",
64 };
65}
66
67#[derive(Debug, Clone, PartialEq)]
68pub struct LegacyS3;
69
70pub type LegacyS3Properties = DummyProperties<LegacyS3>;
73
74pub type LegacyS3SplitEnumerator = DummySplitEnumerator<LegacyS3>;
77
78pub type LegacyFsSplit = DummySplit<LegacyS3>;
79
80impl SourceProperties for LegacyS3Properties {
81 type Split = LegacyFsSplit;
82 type SplitEnumerator = LegacyS3SplitEnumerator;
83 type SplitReader = DummySourceReader<LegacyS3>;
84
85 const SOURCE_NAME: &'static str = LEGACY_S3_CONNECTOR;
86}