risingwave_connector/source/filesystem/opendal_source/
mod.rs1use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use phf::{Set, phf_set};
31
32use self::opendal_reader::OpendalReader;
33use super::OpendalFsSplit;
34use super::file_common::CompressionFormat;
35pub use super::s3::S3PropertiesCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::source::{SourceProperties, UnknownFields};
39
40pub const AZBLOB_CONNECTOR: &str = "azblob";
41pub const GCS_CONNECTOR: &str = "gcs";
42pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
46pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
47
48pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
49
50#[serde_as]
51#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
52pub struct FsSourceCommon {
53 #[serde(rename = "refresh.interval.sec")]
54 #[serde_as(as = "Option<DisplayFromStr>")]
55 pub refresh_interval_sec: Option<u64>,
56
57 #[serde(rename = "compression_format", default = "Default::default")]
58 pub compression_format: CompressionFormat,
59}
60
61#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
62pub struct GcsProperties {
63 #[serde(rename = "gcs.bucket_name")]
64 pub bucket_name: String,
65
66 #[serde(rename = "gcs.credential")]
68 pub credential: Option<String>,
69
70 #[serde(rename = "gcs.service_account", default)]
72 pub service_account: Option<String>,
73
74 #[serde(rename = "match_pattern", default)]
75 pub match_pattern: Option<String>,
76
77 #[serde(flatten)]
78 pub fs_common: FsSourceCommon,
79
80 #[serde(flatten)]
81 pub unknown_fields: HashMap<String, String>,
82
83 #[serde(rename = "compression_format", default = "Default::default")]
84 pub compression_format: CompressionFormat,
85}
86
87impl EnforceSecret for GcsProperties {
88 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
89 "gcs.credential",
90 "gcs.service_account"
91 };
92}
93
94impl UnknownFields for GcsProperties {
95 fn unknown_fields(&self) -> HashMap<String, String> {
96 self.unknown_fields.clone()
97 }
98}
99
100impl SourceProperties for GcsProperties {
101 type Split = OpendalFsSplit<OpendalGcs>;
102 type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
103 type SplitReader = OpendalReader<OpendalGcs>;
104
105 const SOURCE_NAME: &'static str = GCS_CONNECTOR;
106}
107
108pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
109 type Properties: SourceProperties + Send + Sync;
110
111 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub struct OpendalS3;
116
117impl OpendalSource for OpendalS3 {
118 type Properties = OpendalS3Properties;
119
120 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
121 OpendalEnumerator::new_s3_source(
122 &properties.s3_properties,
123 properties.assume_role,
124 properties.fs_common.compression_format,
125 )
126 }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct OpendalGcs;
131
132impl OpendalSource for OpendalGcs {
133 type Properties = GcsProperties;
134
135 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
136 OpendalEnumerator::new_gcs_source(properties)
137 }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub struct OpendalPosixFs;
142
143impl OpendalSource for OpendalPosixFs {
144 type Properties = PosixFsProperties;
145
146 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
147 OpendalEnumerator::new_posix_fs_source(properties)
148 }
149}
150
151#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
152pub struct OpendalS3Properties {
153 #[serde(flatten)]
154 pub s3_properties: S3PropertiesCommon,
155
156 #[serde(rename = "s3.assume_role", default)]
158 pub assume_role: Option<String>,
159
160 #[serde(flatten)]
161 pub fs_common: FsSourceCommon,
162
163 #[serde(flatten)]
164 pub unknown_fields: HashMap<String, String>,
165}
166
167impl EnforceSecret for OpendalS3Properties {
168 fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
169 S3PropertiesCommon::enforce_secret(prop_iter)
170 }
171}
172
173impl UnknownFields for OpendalS3Properties {
174 fn unknown_fields(&self) -> HashMap<String, String> {
175 self.unknown_fields.clone()
176 }
177}
178
179impl SourceProperties for OpendalS3Properties {
180 type Split = OpendalFsSplit<OpendalS3>;
181 type SplitEnumerator = OpendalEnumerator<OpendalS3>;
182 type SplitReader = OpendalReader<OpendalS3>;
183
184 const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
185}
186
187#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
188pub struct PosixFsProperties {
189 #[serde(rename = "posix_fs.root")]
191 pub root: String,
192
193 #[serde(rename = "match_pattern", default)]
195 pub match_pattern: Option<String>,
196
197 #[serde(flatten)]
198 pub fs_common: FsSourceCommon,
199
200 #[serde(flatten)]
201 pub unknown_fields: HashMap<String, String>,
202 #[serde(rename = "compression_format", default = "Default::default")]
203 pub compression_format: CompressionFormat,
204}
205
206impl EnforceSecret for PosixFsProperties {
207 fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
208 Ok(())
209 }
210}
211
212impl UnknownFields for PosixFsProperties {
213 fn unknown_fields(&self) -> HashMap<String, String> {
214 self.unknown_fields.clone()
215 }
216}
217
218impl SourceProperties for PosixFsProperties {
219 type Split = OpendalFsSplit<OpendalPosixFs>;
220 type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
221 type SplitReader = OpendalReader<OpendalPosixFs>;
222
223 const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
224}
225
226#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
227pub struct AzblobProperties {
228 #[serde(rename = "azblob.container_name")]
229 pub container_name: String,
230
231 #[serde(rename = "azblob.credentials.account_name", default)]
232 pub account_name: Option<String>,
233 #[serde(rename = "azblob.credentials.account_key", default)]
234 pub account_key: Option<String>,
235 #[serde(rename = "azblob.endpoint_url")]
236 pub endpoint_url: String,
237
238 #[serde(rename = "match_pattern", default)]
239 pub match_pattern: Option<String>,
240
241 #[serde(flatten)]
242 pub fs_common: FsSourceCommon,
243
244 #[serde(flatten)]
245 pub unknown_fields: HashMap<String, String>,
246
247 #[serde(rename = "compression_format", default = "Default::default")]
248 pub compression_format: CompressionFormat,
249}
250
251impl EnforceSecret for AzblobProperties {
252 const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
253 "azblob.credentials.account_key",
254 "azblob.credentials.account_name",
255 };
256}
257
258impl UnknownFields for AzblobProperties {
259 fn unknown_fields(&self) -> HashMap<String, String> {
260 self.unknown_fields.clone()
261 }
262}
263
264impl SourceProperties for AzblobProperties {
265 type Split = OpendalFsSplit<OpendalAzblob>;
266 type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
267 type SplitReader = OpendalReader<OpendalAzblob>;
268
269 const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct OpendalAzblob;
274
275impl OpendalSource for OpendalAzblob {
276 type Properties = AzblobProperties;
277
278 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
279 OpendalEnumerator::new_azblob_source(properties)
280 }
281}