risingwave_connector/source/filesystem/opendal_source/
mod.rsuse std::collections::HashMap;
pub mod azblob_source;
pub mod gcs_source;
pub mod posix_fs_source;
pub mod s3_source;
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr};
use with_options::WithOptions;
pub mod opendal_enumerator;
pub mod opendal_reader;
use self::opendal_enumerator::OpendalEnumerator;
use self::opendal_reader::OpendalReader;
use super::file_common::CompressionFormat;
pub use super::s3::S3PropertiesCommon;
use super::OpendalFsSplit;
use crate::error::ConnectorResult;
use crate::source::{SourceProperties, UnknownFields};
pub const AZBLOB_CONNECTOR: &str = "azblob";
pub const GCS_CONNECTOR: &str = "gcs";
pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
#[serde_as]
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct FsSourceCommon {
#[serde(rename = "refresh.interval.sec")]
#[serde_as(as = "Option<DisplayFromStr>")]
pub refresh_interval_sec: Option<u64>,
}
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct GcsProperties {
#[serde(rename = "gcs.bucket_name")]
pub bucket_name: String,
#[serde(rename = "gcs.credential")]
pub credential: Option<String>,
#[serde(rename = "gcs.service_account", default)]
pub service_account: Option<String>,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,
#[serde(flatten)]
pub fs_common: FsSourceCommon,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}
impl UnknownFields for GcsProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
impl SourceProperties for GcsProperties {
type Split = OpendalFsSplit<OpendalGcs>;
type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
type SplitReader = OpendalReader<OpendalGcs>;
const SOURCE_NAME: &'static str = GCS_CONNECTOR;
}
pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
type Properties: SourceProperties + Send + Sync;
fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalS3;
impl OpendalSource for OpendalS3 {
type Properties = OpendalS3Properties;
fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalGcs;
impl OpendalSource for OpendalGcs {
type Properties = GcsProperties;
fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_gcs_source(properties)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalPosixFs;
impl OpendalSource for OpendalPosixFs {
type Properties = PosixFsProperties;
fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_posix_fs_source(properties)
}
}
#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
pub struct OpendalS3Properties {
#[serde(flatten)]
pub s3_properties: S3PropertiesCommon,
#[serde(rename = "s3.assume_role", default)]
pub assume_role: Option<String>,
#[serde(flatten)]
pub fs_common: FsSourceCommon,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
}
impl UnknownFields for OpendalS3Properties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
impl SourceProperties for OpendalS3Properties {
type Split = OpendalFsSplit<OpendalS3>;
type SplitEnumerator = OpendalEnumerator<OpendalS3>;
type SplitReader = OpendalReader<OpendalS3>;
const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
}
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct PosixFsProperties {
#[serde(rename = "posix_fs.root")]
pub root: String,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,
#[serde(flatten)]
pub fs_common: FsSourceCommon,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}
impl UnknownFields for PosixFsProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
impl SourceProperties for PosixFsProperties {
type Split = OpendalFsSplit<OpendalPosixFs>;
type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
type SplitReader = OpendalReader<OpendalPosixFs>;
const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
}
#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
pub struct AzblobProperties {
#[serde(rename = "azblob.container_name")]
pub container_name: String,
#[serde(rename = "azblob.credentials.account_name", default)]
pub account_name: Option<String>,
#[serde(rename = "azblob.credentials.account_key", default)]
pub account_key: Option<String>,
#[serde(rename = "azblob.endpoint_url")]
pub endpoint_url: String,
#[serde(rename = "match_pattern", default)]
pub match_pattern: Option<String>,
#[serde(flatten)]
pub fs_common: FsSourceCommon,
#[serde(flatten)]
pub unknown_fields: HashMap<String, String>,
#[serde(rename = "compression_format", default = "Default::default")]
pub compression_format: CompressionFormat,
}
impl UnknownFields for AzblobProperties {
fn unknown_fields(&self) -> HashMap<String, String> {
self.unknown_fields.clone()
}
}
impl SourceProperties for AzblobProperties {
type Split = OpendalFsSplit<OpendalAzblob>;
type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
type SplitReader = OpendalReader<OpendalAzblob>;
const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct OpendalAzblob;
impl OpendalSource for OpendalAzblob {
type Properties = AzblobProperties;
fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
OpendalEnumerator::new_azblob_source(properties)
}
}