risingwave_connector/source/filesystem/s3/
mod.rspub mod enumerator;
use std::collections::HashMap;
pub use enumerator::S3SplitEnumerator;
use crate::source::filesystem::file_common::CompressionFormat;
mod source;
use serde::Deserialize;
pub use source::S3FileReader;
use crate::connector_common::AwsAuthProps;
use crate::source::filesystem::FsSplit;
use crate::source::{SourceProperties, UnknownFields};
pub const S3_CONNECTOR: &str = "s3";
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3PropertiesCommon {
#[serde(rename = "s3.region_name")]
pub region_name: String,
#[serde(rename = "s3.bucket_name")]
pub bucket_name: String,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,
#[serde(rename = "s3.credentials.access", default)]
pub access: Option<String>,
#[serde(rename = "s3.credentials.secret", default)]
pub secret: Option<String>,
#[serde(rename = "s3.endpoint_url")]
pub endpoint_url: Option<String>,
#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct S3Properties {
#[serde(flatten)]
pub common: S3PropertiesCommon,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
impl From<S3PropertiesCommon> for S3Properties {
fn from(common: S3PropertiesCommon) -> Self {
Self {
common,
unknown_fields: HashMap::new(),
}
}
}
impl SourceProperties for S3Properties {
type Split = FsSplit;
type SplitEnumerator = S3SplitEnumerator;
type SplitReader = S3FileReader;
const SOURCE_NAME: &'static str = S3_CONNECTOR;
}
impl UnknownFields for S3Properties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
impl From<&S3Properties> for AwsAuthProps {
fn from(props: &S3Properties) -> Self {
let props = &props.common;
Self {
region: Some(props.region_name.clone()),
endpoint: props.endpoint_url.clone(),
access_key: props.access.clone(),
secret_key: props.secret.clone(),
session_token: Default::default(),
arn: Default::default(),
external_id: Default::default(),
profile: Default::default(),
}
}
}