risingwave_connector/source/filesystem/opendal_source/
mod.rs1use std::collections::HashMap;
16
17pub use batch_posix_fs_source::{BatchPosixFsEnumerator, BatchPosixFsReader, BatchPosixFsSplit};
18pub use opendal_enumerator::OpendalEnumerator;
19
20pub mod azblob_source;
21pub mod batch_posix_fs_source;
22pub mod gcs_source;
23pub mod posix_fs_source;
24pub mod s3_source;
25
26use serde::Deserialize;
27use serde_with::{DisplayFromStr, serde_as};
28use with_options::WithOptions;
29pub mod opendal_enumerator;
30pub mod opendal_reader;
31
32use phf::{Set, phf_set};
33
34use self::opendal_reader::OpendalReader;
35use super::OpendalFsSplit;
36use super::file_common::CompressionFormat;
37pub use super::s3::S3PropertiesCommon;
38use crate::enforce_secret::EnforceSecret;
39use crate::error::{ConnectorError, ConnectorResult};
40use crate::source::{SourceProperties, UnknownFields};
41
42pub const AZBLOB_CONNECTOR: &str = "azblob";
43pub const GCS_CONNECTOR: &str = "gcs";
44pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
48pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
49pub const BATCH_POSIX_FS_CONNECTOR: &str = "__for_testing_only_batch_posix_fs";
50
51pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
52
53#[serde_as]
54#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
55pub struct FsSourceCommon {
56 #[serde(rename = "refresh.interval.sec")]
57 #[serde_as(as = "Option<DisplayFromStr>")]
58 pub refresh_interval_sec: Option<u64>,
59
60 #[serde(rename = "compression_format", default = "Default::default")]
61 pub compression_format: CompressionFormat,
62}
63
64#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
65pub struct GcsProperties {
66 #[serde(rename = "gcs.bucket_name")]
67 pub bucket_name: String,
68
69 #[serde(rename = "gcs.credential")]
71 pub credential: Option<String>,
72
73 #[serde(rename = "gcs.service_account", default)]
75 pub service_account: Option<String>,
76
77 #[serde(rename = "match_pattern", default)]
78 pub match_pattern: Option<String>,
79
80 #[serde(flatten)]
81 pub fs_common: FsSourceCommon,
82
83 #[serde(flatten)]
84 pub unknown_fields: HashMap<String, String>,
85
86 #[serde(rename = "compression_format", default = "Default::default")]
87 pub compression_format: CompressionFormat,
88}
89
90impl EnforceSecret for GcsProperties {
91 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
92 "gcs.credential",
93 "gcs.service_account"
94 };
95}
96
97impl UnknownFields for GcsProperties {
98 fn unknown_fields(&self) -> HashMap<String, String> {
99 self.unknown_fields.clone()
100 }
101}
102
103impl SourceProperties for GcsProperties {
104 type Split = OpendalFsSplit<OpendalGcs>;
105 type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
106 type SplitReader = OpendalReader<OpendalGcs>;
107
108 const SOURCE_NAME: &'static str = GCS_CONNECTOR;
109}
110
111pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
112 type Properties: SourceProperties + Send + Sync;
113
114 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct OpendalS3;
119
120impl OpendalSource for OpendalS3 {
121 type Properties = OpendalS3Properties;
122
123 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
124 OpendalEnumerator::new_s3_source(
125 &properties.s3_properties,
126 properties.assume_role,
127 properties.fs_common.compression_format,
128 )
129 }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub struct OpendalGcs;
134
135impl OpendalSource for OpendalGcs {
136 type Properties = GcsProperties;
137
138 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
139 OpendalEnumerator::new_gcs_source(properties)
140 }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub struct OpendalPosixFs;
145
146impl OpendalSource for OpendalPosixFs {
147 type Properties = PosixFsProperties;
148
149 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
150 OpendalEnumerator::new_posix_fs_source(properties)
151 }
152}
153
154#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
155pub struct OpendalS3Properties {
156 #[serde(flatten)]
157 pub s3_properties: S3PropertiesCommon,
158
159 #[serde(rename = "s3.assume_role", default)]
161 pub assume_role: Option<String>,
162
163 #[serde(flatten)]
164 pub fs_common: FsSourceCommon,
165
166 #[serde(flatten)]
167 pub unknown_fields: HashMap<String, String>,
168}
169
170impl EnforceSecret for OpendalS3Properties {
171 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
172 S3PropertiesCommon::enforce_secret(prop_iter)
173 }
174}
175
176impl UnknownFields for OpendalS3Properties {
177 fn unknown_fields(&self) -> HashMap<String, String> {
178 self.unknown_fields.clone()
179 }
180}
181
182impl SourceProperties for OpendalS3Properties {
183 type Split = OpendalFsSplit<OpendalS3>;
184 type SplitEnumerator = OpendalEnumerator<OpendalS3>;
185 type SplitReader = OpendalReader<OpendalS3>;
186
187 const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
188}
189
190#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
191pub struct PosixFsProperties {
192 #[serde(rename = "posix_fs.root")]
194 pub root: String,
195
196 #[serde(rename = "match_pattern", default)]
198 pub match_pattern: Option<String>,
199
200 #[serde(flatten)]
201 pub fs_common: FsSourceCommon,
202
203 #[serde(flatten)]
204 pub unknown_fields: HashMap<String, String>,
205 #[serde(rename = "compression_format", default = "Default::default")]
206 pub compression_format: CompressionFormat,
207}
208
209impl EnforceSecret for PosixFsProperties {
210 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
211 Ok(())
212 }
213}
214
215impl UnknownFields for PosixFsProperties {
216 fn unknown_fields(&self) -> HashMap<String, String> {
217 self.unknown_fields.clone()
218 }
219}
220
221impl SourceProperties for PosixFsProperties {
222 type Split = OpendalFsSplit<OpendalPosixFs>;
223 type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
224 type SplitReader = OpendalReader<OpendalPosixFs>;
225
226 const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
227}
228
229#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
230pub struct AzblobProperties {
231 #[serde(rename = "azblob.container_name")]
232 pub container_name: String,
233
234 #[serde(rename = "azblob.credentials.account_name", default)]
235 pub account_name: Option<String>,
236 #[serde(rename = "azblob.credentials.account_key", default)]
237 pub account_key: Option<String>,
238 #[serde(rename = "azblob.endpoint_url")]
239 pub endpoint_url: String,
240
241 #[serde(rename = "match_pattern", default)]
242 pub match_pattern: Option<String>,
243
244 #[serde(flatten)]
245 pub fs_common: FsSourceCommon,
246
247 #[serde(flatten)]
248 pub unknown_fields: HashMap<String, String>,
249
250 #[serde(rename = "compression_format", default = "Default::default")]
251 pub compression_format: CompressionFormat,
252}
253
254impl EnforceSecret for AzblobProperties {
255 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
256 "azblob.credentials.account_key",
257 "azblob.credentials.account_name",
258 };
259}
260
261impl UnknownFields for AzblobProperties {
262 fn unknown_fields(&self) -> HashMap<String, String> {
263 self.unknown_fields.clone()
264 }
265}
266
267impl SourceProperties for AzblobProperties {
268 type Split = OpendalFsSplit<OpendalAzblob>;
269 type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
270 type SplitReader = OpendalReader<OpendalAzblob>;
271
272 const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub struct OpendalAzblob;
277
278impl OpendalSource for OpendalAzblob {
279 type Properties = AzblobProperties;
280
281 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
282 OpendalEnumerator::new_azblob_source(properties)
283 }
284}
285
286#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
287pub struct BatchPosixFsProperties {
288 #[serde(rename = "batch_posix_fs.root")]
290 pub root: String,
291
292 #[serde(rename = "match_pattern", default)]
294 pub match_pattern: Option<String>,
295
296 #[serde(flatten)]
297 pub fs_common: FsSourceCommon,
298
299 #[serde(flatten)]
300 pub unknown_fields: HashMap<String, String>,
301}
302
303impl EnforceSecret for BatchPosixFsProperties {
304 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
305 Ok(())
306 }
307}
308
309impl UnknownFields for BatchPosixFsProperties {
310 fn unknown_fields(&self) -> HashMap<String, String> {
311 self.unknown_fields.clone()
312 }
313}
314
315impl SourceProperties for BatchPosixFsProperties {
316 type Split = batch_posix_fs_source::BatchPosixFsSplit;
317 type SplitEnumerator = batch_posix_fs_source::BatchPosixFsEnumerator;
318 type SplitReader = batch_posix_fs_source::BatchPosixFsReader;
319
320 const SOURCE_NAME: &'static str = BATCH_POSIX_FS_CONNECTOR;
321}