risingwave_connector/source/filesystem/opendal_source/
mod.rs1use std::collections::HashMap;
16
17pub use batch_posix_fs_source::{BatchPosixFsEnumerator, BatchPosixFsReader, BatchPosixFsSplit};
18pub use opendal_enumerator::OpendalEnumerator;
19
20pub mod azblob_source;
21pub mod batch_posix_fs_source;
22pub mod gcs_source;
23pub mod posix_fs_source;
24pub mod s3_source;
25
26use serde::Deserialize;
27use serde_with::{DisplayFromStr, serde_as};
28use with_options::WithOptions;
29pub mod opendal_enumerator;
30pub mod opendal_reader;
31
32use phf::{Set, phf_set};
33
34use self::opendal_reader::OpendalReader;
35use super::OpendalFsSplit;
36use super::file_common::CompressionFormat;
37pub use super::s3::S3PropertiesCommon;
38use crate::deserialize_optional_bool_from_string;
39use crate::enforce_secret::EnforceSecret;
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::{SourceProperties, UnknownFields};
42
43pub const AZBLOB_CONNECTOR: &str = "azblob";
44pub const GCS_CONNECTOR: &str = "gcs";
45pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
49pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
50pub const BATCH_POSIX_FS_CONNECTOR: &str = "__for_testing_only_batch_posix_fs";
51
52pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
53
54#[serde_as]
55#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
56pub struct FsSourceCommon {
57 #[serde(rename = "refresh.interval.sec")]
58 #[serde_as(as = "Option<DisplayFromStr>")]
59 pub refresh_interval_sec: Option<u64>,
60
61 #[serde(rename = "compression_format", default = "Default::default")]
62 pub compression_format: CompressionFormat,
63
64 #[serde(
65 rename = "parquet.case_insensitive",
66 default,
67 deserialize_with = "deserialize_optional_bool_from_string"
68 )]
69 pub parquet_case_insensitive: Option<bool>,
70}
71
72#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
73pub struct GcsProperties {
74 #[serde(rename = "gcs.bucket_name")]
75 pub bucket_name: String,
76
77 #[serde(rename = "gcs.credential")]
79 pub credential: Option<String>,
80
81 #[serde(rename = "gcs.service_account", default)]
83 pub service_account: Option<String>,
84
85 #[serde(rename = "match_pattern", default)]
86 pub match_pattern: Option<String>,
87
88 #[serde(flatten)]
89 pub fs_common: FsSourceCommon,
90
91 #[serde(flatten)]
92 pub unknown_fields: HashMap<String, String>,
93
94 #[serde(rename = "compression_format", default = "Default::default")]
95 pub compression_format: CompressionFormat,
96}
97
98impl EnforceSecret for GcsProperties {
99 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
100 "gcs.credential",
101 "gcs.service_account"
102 };
103}
104
105impl UnknownFields for GcsProperties {
106 fn unknown_fields(&self) -> HashMap<String, String> {
107 self.unknown_fields.clone()
108 }
109}
110
111impl SourceProperties for GcsProperties {
112 type Split = OpendalFsSplit<OpendalGcs>;
113 type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
114 type SplitReader = OpendalReader<OpendalGcs>;
115
116 const SOURCE_NAME: &'static str = GCS_CONNECTOR;
117}
118
119pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
120 type Properties: SourceProperties + Send + Sync;
121
122 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct OpendalS3;
127
128impl OpendalSource for OpendalS3 {
129 type Properties = OpendalS3Properties;
130
131 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
132 OpendalEnumerator::new_s3_source(
133 &properties.s3_properties,
134 properties.assume_role,
135 properties.fs_common.compression_format,
136 )
137 }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub struct OpendalGcs;
142
143impl OpendalSource for OpendalGcs {
144 type Properties = GcsProperties;
145
146 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
147 OpendalEnumerator::new_gcs_source(properties)
148 }
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub struct OpendalPosixFs;
153
154impl OpendalSource for OpendalPosixFs {
155 type Properties = PosixFsProperties;
156
157 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
158 OpendalEnumerator::new_posix_fs_source(properties)
159 }
160}
161
162#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
163pub struct OpendalS3Properties {
164 #[serde(flatten)]
165 pub s3_properties: S3PropertiesCommon,
166
167 #[serde(rename = "s3.assume_role", default)]
169 pub assume_role: Option<String>,
170
171 #[serde(flatten)]
172 pub fs_common: FsSourceCommon,
173
174 #[serde(flatten)]
175 pub unknown_fields: HashMap<String, String>,
176}
177
178impl EnforceSecret for OpendalS3Properties {
179 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
180 S3PropertiesCommon::enforce_secret(prop_iter)
181 }
182}
183
184impl UnknownFields for OpendalS3Properties {
185 fn unknown_fields(&self) -> HashMap<String, String> {
186 self.unknown_fields.clone()
187 }
188}
189
190impl SourceProperties for OpendalS3Properties {
191 type Split = OpendalFsSplit<OpendalS3>;
192 type SplitEnumerator = OpendalEnumerator<OpendalS3>;
193 type SplitReader = OpendalReader<OpendalS3>;
194
195 const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
196}
197
198#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
199pub struct PosixFsProperties {
200 #[serde(rename = "posix_fs.root")]
202 pub root: String,
203
204 #[serde(rename = "match_pattern", default)]
206 pub match_pattern: Option<String>,
207
208 #[serde(flatten)]
209 pub fs_common: FsSourceCommon,
210
211 #[serde(flatten)]
212 pub unknown_fields: HashMap<String, String>,
213 #[serde(rename = "compression_format", default = "Default::default")]
214 pub compression_format: CompressionFormat,
215}
216
217impl EnforceSecret for PosixFsProperties {
218 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
219 Ok(())
220 }
221}
222
223impl UnknownFields for PosixFsProperties {
224 fn unknown_fields(&self) -> HashMap<String, String> {
225 self.unknown_fields.clone()
226 }
227}
228
229impl SourceProperties for PosixFsProperties {
230 type Split = OpendalFsSplit<OpendalPosixFs>;
231 type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
232 type SplitReader = OpendalReader<OpendalPosixFs>;
233
234 const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
235}
236
237#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
238pub struct AzblobProperties {
239 #[serde(rename = "azblob.container_name")]
240 pub container_name: String,
241
242 #[serde(rename = "azblob.credentials.account_name", default)]
243 pub account_name: Option<String>,
244 #[serde(rename = "azblob.credentials.account_key", default)]
245 pub account_key: Option<String>,
246 #[serde(rename = "azblob.endpoint_url")]
247 pub endpoint_url: String,
248
249 #[serde(rename = "match_pattern", default)]
250 pub match_pattern: Option<String>,
251
252 #[serde(flatten)]
253 pub fs_common: FsSourceCommon,
254
255 #[serde(flatten)]
256 pub unknown_fields: HashMap<String, String>,
257
258 #[serde(rename = "compression_format", default = "Default::default")]
259 pub compression_format: CompressionFormat,
260}
261
262impl EnforceSecret for AzblobProperties {
263 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
264 "azblob.credentials.account_key",
265 "azblob.credentials.account_name",
266 };
267}
268
269impl UnknownFields for AzblobProperties {
270 fn unknown_fields(&self) -> HashMap<String, String> {
271 self.unknown_fields.clone()
272 }
273}
274
275impl SourceProperties for AzblobProperties {
276 type Split = OpendalFsSplit<OpendalAzblob>;
277 type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
278 type SplitReader = OpendalReader<OpendalAzblob>;
279
280 const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
281}
282
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct OpendalAzblob;
285
286impl OpendalSource for OpendalAzblob {
287 type Properties = AzblobProperties;
288
289 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
290 OpendalEnumerator::new_azblob_source(properties)
291 }
292}
293
294#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
295pub struct BatchPosixFsProperties {
296 #[serde(rename = "batch_posix_fs.root")]
298 pub root: String,
299
300 #[serde(rename = "match_pattern", default)]
302 pub match_pattern: Option<String>,
303
304 #[serde(flatten)]
305 pub fs_common: FsSourceCommon,
306
307 #[serde(flatten)]
308 pub unknown_fields: HashMap<String, String>,
309}
310
311impl EnforceSecret for BatchPosixFsProperties {
312 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
313 Ok(())
314 }
315}
316
317impl UnknownFields for BatchPosixFsProperties {
318 fn unknown_fields(&self) -> HashMap<String, String> {
319 self.unknown_fields.clone()
320 }
321}
322
323impl SourceProperties for BatchPosixFsProperties {
324 type Split = batch_posix_fs_source::BatchPosixFsSplit;
325 type SplitEnumerator = batch_posix_fs_source::BatchPosixFsEnumerator;
326 type SplitReader = batch_posix_fs_source::BatchPosixFsReader;
327
328 const SOURCE_NAME: &'static str = BATCH_POSIX_FS_CONNECTOR;
329}