risingwave_connector/source/filesystem/opendal_source/
mod.rs1use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use phf::{Set, phf_set};
31
32use self::opendal_reader::OpendalReader;
33use super::OpendalFsSplit;
34use super::file_common::CompressionFormat;
35pub use super::s3::S3PropertiesCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::source::{SourceProperties, UnknownFields};
39
40pub const AZBLOB_CONNECTOR: &str = "azblob";
41pub const GCS_CONNECTOR: &str = "gcs";
42pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
46pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
47
48pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
49
50#[serde_as]
51#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
52pub struct FsSourceCommon {
53 #[serde(rename = "refresh.interval.sec")]
54 #[serde_as(as = "Option<DisplayFromStr>")]
55 pub refresh_interval_sec: Option<u64>,
56}
57
58#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
59pub struct GcsProperties {
60 #[serde(rename = "gcs.bucket_name")]
61 pub bucket_name: String,
62
63 #[serde(rename = "gcs.credential")]
65 pub credential: Option<String>,
66
67 #[serde(rename = "gcs.service_account", default)]
69 pub service_account: Option<String>,
70
71 #[serde(rename = "match_pattern", default)]
72 pub match_pattern: Option<String>,
73
74 #[serde(flatten)]
75 pub fs_common: FsSourceCommon,
76
77 #[serde(flatten)]
78 pub unknown_fields: HashMap<String, String>,
79
80 #[serde(rename = "compression_format", default = "Default::default")]
81 pub compression_format: CompressionFormat,
82}
83
84impl EnforceSecret for GcsProperties {
85 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
86 "gcs.credential",
87 "gcs.service_account"
88 };
89}
90
91impl UnknownFields for GcsProperties {
92 fn unknown_fields(&self) -> HashMap<String, String> {
93 self.unknown_fields.clone()
94 }
95}
96
97impl SourceProperties for GcsProperties {
98 type Split = OpendalFsSplit<OpendalGcs>;
99 type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
100 type SplitReader = OpendalReader<OpendalGcs>;
101
102 const SOURCE_NAME: &'static str = GCS_CONNECTOR;
103}
104
105pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
106 type Properties: SourceProperties + Send + Sync;
107
108 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct OpendalS3;
113
114impl OpendalSource for OpendalS3 {
115 type Properties = OpendalS3Properties;
116
117 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
118 OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role)
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct OpendalGcs;
124
125impl OpendalSource for OpendalGcs {
126 type Properties = GcsProperties;
127
128 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
129 OpendalEnumerator::new_gcs_source(properties)
130 }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub struct OpendalPosixFs;
135
136impl OpendalSource for OpendalPosixFs {
137 type Properties = PosixFsProperties;
138
139 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
140 OpendalEnumerator::new_posix_fs_source(properties)
141 }
142}
143
144#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
145pub struct OpendalS3Properties {
146 #[serde(flatten)]
147 pub s3_properties: S3PropertiesCommon,
148
149 #[serde(rename = "s3.assume_role", default)]
151 pub assume_role: Option<String>,
152
153 #[serde(flatten)]
154 pub fs_common: FsSourceCommon,
155
156 #[serde(flatten)]
157 pub unknown_fields: HashMap<String, String>,
158}
159
160impl EnforceSecret for OpendalS3Properties {
161 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
162 S3PropertiesCommon::enforce_secret(prop_iter)
163 }
164}
165
166impl UnknownFields for OpendalS3Properties {
167 fn unknown_fields(&self) -> HashMap<String, String> {
168 self.unknown_fields.clone()
169 }
170}
171
172impl SourceProperties for OpendalS3Properties {
173 type Split = OpendalFsSplit<OpendalS3>;
174 type SplitEnumerator = OpendalEnumerator<OpendalS3>;
175 type SplitReader = OpendalReader<OpendalS3>;
176
177 const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
178}
179
180#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
181pub struct PosixFsProperties {
182 #[serde(rename = "posix_fs.root")]
184 pub root: String,
185
186 #[serde(rename = "match_pattern", default)]
188 pub match_pattern: Option<String>,
189
190 #[serde(flatten)]
191 pub fs_common: FsSourceCommon,
192
193 #[serde(flatten)]
194 pub unknown_fields: HashMap<String, String>,
195 #[serde(rename = "compression_format", default = "Default::default")]
196 pub compression_format: CompressionFormat,
197}
198
199impl EnforceSecret for PosixFsProperties {
200 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
201 Ok(())
202 }
203}
204
205impl UnknownFields for PosixFsProperties {
206 fn unknown_fields(&self) -> HashMap<String, String> {
207 self.unknown_fields.clone()
208 }
209}
210
211impl SourceProperties for PosixFsProperties {
212 type Split = OpendalFsSplit<OpendalPosixFs>;
213 type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
214 type SplitReader = OpendalReader<OpendalPosixFs>;
215
216 const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
217}
218
219#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
220pub struct AzblobProperties {
221 #[serde(rename = "azblob.container_name")]
222 pub container_name: String,
223
224 #[serde(rename = "azblob.credentials.account_name", default)]
225 pub account_name: Option<String>,
226 #[serde(rename = "azblob.credentials.account_key", default)]
227 pub account_key: Option<String>,
228 #[serde(rename = "azblob.endpoint_url")]
229 pub endpoint_url: String,
230
231 #[serde(rename = "match_pattern", default)]
232 pub match_pattern: Option<String>,
233
234 #[serde(flatten)]
235 pub fs_common: FsSourceCommon,
236
237 #[serde(flatten)]
238 pub unknown_fields: HashMap<String, String>,
239
240 #[serde(rename = "compression_format", default = "Default::default")]
241 pub compression_format: CompressionFormat,
242}
243
244impl EnforceSecret for AzblobProperties {
245 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
246 "azblob.credentials.account_key",
247 "azblob.credentials.account_name",
248 };
249}
250
251impl UnknownFields for AzblobProperties {
252 fn unknown_fields(&self) -> HashMap<String, String> {
253 self.unknown_fields.clone()
254 }
255}
256
257impl SourceProperties for AzblobProperties {
258 type Split = OpendalFsSplit<OpendalAzblob>;
259 type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
260 type SplitReader = OpendalReader<OpendalAzblob>;
261
262 const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266pub struct OpendalAzblob;
267
268impl OpendalSource for OpendalAzblob {
269 type Properties = AzblobProperties;
270
271 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
272 OpendalEnumerator::new_azblob_source(properties)
273 }
274}