risingwave_connector/source/filesystem/opendal_source/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17pub use batch_posix_fs_source::{BatchPosixFsEnumerator, BatchPosixFsReader, BatchPosixFsSplit};
18pub use opendal_enumerator::OpendalEnumerator;
19
20pub mod azblob_source;
21pub mod batch_posix_fs_source;
22pub mod gcs_source;
23pub mod posix_fs_source;
24pub mod s3_source;
25
26use serde::Deserialize;
27use serde_with::{DisplayFromStr, serde_as};
28use with_options::WithOptions;
29pub mod opendal_enumerator;
30pub mod opendal_reader;
31
32use phf::{Set, phf_set};
33
34use self::opendal_reader::OpendalReader;
35use super::OpendalFsSplit;
36use super::file_common::CompressionFormat;
37pub use super::s3::S3PropertiesCommon;
38use crate::deserialize_optional_bool_from_string;
39use crate::enforce_secret::EnforceSecret;
40use crate::error::{ConnectorError, ConnectorResult};
41use crate::source::{SourceProperties, UnknownFields};
42
43pub const AZBLOB_CONNECTOR: &str = "azblob";
44pub const GCS_CONNECTOR: &str = "gcs";
45/// The new `s3_v2` will use opendal.
46/// Note: user uses `connector='s3'`, which is converted to `connector='s3_v2'` in frontend (in `validate_compatibility`).
47/// If user inputs `connector='s3_v2'`, it will be rejected.
48pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
49pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
50pub const BATCH_POSIX_FS_CONNECTOR: &str = "__for_testing_only_batch_posix_fs";
51
52pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
53
54#[serde_as]
55#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
56pub struct FsSourceCommon {
57    #[serde(rename = "refresh.interval.sec")]
58    #[serde_as(as = "Option<DisplayFromStr>")]
59    pub refresh_interval_sec: Option<u64>,
60
61    #[serde(rename = "compression_format", default = "Default::default")]
62    pub compression_format: CompressionFormat,
63
64    #[serde(
65        rename = "parquet.case_insensitive",
66        default,
67        deserialize_with = "deserialize_optional_bool_from_string"
68    )]
69    pub parquet_case_insensitive: Option<bool>,
70}
71
72#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
73pub struct GcsProperties {
74    #[serde(rename = "gcs.bucket_name")]
75    pub bucket_name: String,
76
77    /// The base64 encoded credential key. If not set, ADC will be used.
78    #[serde(rename = "gcs.credential")]
79    pub credential: Option<String>,
80
81    /// If credential/ADC is not set. The service account can be used to provide the credential info.
82    #[serde(rename = "gcs.service_account", default)]
83    pub service_account: Option<String>,
84
85    #[serde(rename = "match_pattern", default)]
86    pub match_pattern: Option<String>,
87
88    #[serde(flatten)]
89    pub fs_common: FsSourceCommon,
90
91    #[serde(flatten)]
92    pub unknown_fields: HashMap<String, String>,
93
94    #[serde(rename = "compression_format", default = "Default::default")]
95    pub compression_format: CompressionFormat,
96}
97
98impl EnforceSecret for GcsProperties {
99    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
100        "gcs.credential",
101        "gcs.service_account"
102    };
103}
104
105impl UnknownFields for GcsProperties {
106    fn unknown_fields(&self) -> HashMap<String, String> {
107        self.unknown_fields.clone()
108    }
109}
110
111impl SourceProperties for GcsProperties {
112    type Split = OpendalFsSplit<OpendalGcs>;
113    type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
114    type SplitReader = OpendalReader<OpendalGcs>;
115
116    const SOURCE_NAME: &'static str = GCS_CONNECTOR;
117}
118
119pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
120    type Properties: SourceProperties + Send + Sync;
121
122    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct OpendalS3;
127
128impl OpendalSource for OpendalS3 {
129    type Properties = OpendalS3Properties;
130
131    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
132        OpendalEnumerator::new_s3_source(
133            &properties.s3_properties,
134            properties.assume_role,
135            properties.fs_common.compression_format,
136        )
137    }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub struct OpendalGcs;
142
143impl OpendalSource for OpendalGcs {
144    type Properties = GcsProperties;
145
146    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
147        OpendalEnumerator::new_gcs_source(properties)
148    }
149}
150
151#[derive(Debug, Clone, Copy, PartialEq, Eq)]
152pub struct OpendalPosixFs;
153
154impl OpendalSource for OpendalPosixFs {
155    type Properties = PosixFsProperties;
156
157    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
158        OpendalEnumerator::new_posix_fs_source(properties)
159    }
160}
161
162#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
163pub struct OpendalS3Properties {
164    #[serde(flatten)]
165    pub s3_properties: S3PropertiesCommon,
166
167    /// The following are only supported by `s3_v2` (opendal) source.
168    #[serde(rename = "s3.assume_role", default)]
169    pub assume_role: Option<String>,
170
171    #[serde(flatten)]
172    pub fs_common: FsSourceCommon,
173
174    #[serde(flatten)]
175    pub unknown_fields: HashMap<String, String>,
176}
177
178impl EnforceSecret for OpendalS3Properties {
179    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
180        S3PropertiesCommon::enforce_secret(prop_iter)
181    }
182}
183
184impl UnknownFields for OpendalS3Properties {
185    fn unknown_fields(&self) -> HashMap<String, String> {
186        self.unknown_fields.clone()
187    }
188}
189
190impl SourceProperties for OpendalS3Properties {
191    type Split = OpendalFsSplit<OpendalS3>;
192    type SplitEnumerator = OpendalEnumerator<OpendalS3>;
193    type SplitReader = OpendalReader<OpendalS3>;
194
195    const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
196}
197
198#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
199pub struct PosixFsProperties {
200    /// The root directly of the files to search. The files will be searched recursively.
201    #[serde(rename = "posix_fs.root")]
202    pub root: String,
203
204    /// The regex pattern to match files under root directory.
205    #[serde(rename = "match_pattern", default)]
206    pub match_pattern: Option<String>,
207
208    #[serde(flatten)]
209    pub fs_common: FsSourceCommon,
210
211    #[serde(flatten)]
212    pub unknown_fields: HashMap<String, String>,
213    #[serde(rename = "compression_format", default = "Default::default")]
214    pub compression_format: CompressionFormat,
215}
216
217impl EnforceSecret for PosixFsProperties {
218    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
219        Ok(())
220    }
221}
222
223impl UnknownFields for PosixFsProperties {
224    fn unknown_fields(&self) -> HashMap<String, String> {
225        self.unknown_fields.clone()
226    }
227}
228
229impl SourceProperties for PosixFsProperties {
230    type Split = OpendalFsSplit<OpendalPosixFs>;
231    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
232    type SplitReader = OpendalReader<OpendalPosixFs>;
233
234    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
235}
236
237#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
238pub struct AzblobProperties {
239    #[serde(rename = "azblob.container_name")]
240    pub container_name: String,
241
242    #[serde(rename = "azblob.credentials.account_name", default)]
243    pub account_name: Option<String>,
244    #[serde(rename = "azblob.credentials.account_key", default)]
245    pub account_key: Option<String>,
246    #[serde(rename = "azblob.endpoint_url")]
247    pub endpoint_url: String,
248
249    #[serde(rename = "match_pattern", default)]
250    pub match_pattern: Option<String>,
251
252    #[serde(flatten)]
253    pub fs_common: FsSourceCommon,
254
255    #[serde(flatten)]
256    pub unknown_fields: HashMap<String, String>,
257
258    #[serde(rename = "compression_format", default = "Default::default")]
259    pub compression_format: CompressionFormat,
260}
261
262impl EnforceSecret for AzblobProperties {
263    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
264        "azblob.credentials.account_key",
265        "azblob.credentials.account_name",
266    };
267}
268
269impl UnknownFields for AzblobProperties {
270    fn unknown_fields(&self) -> HashMap<String, String> {
271        self.unknown_fields.clone()
272    }
273}
274
275impl SourceProperties for AzblobProperties {
276    type Split = OpendalFsSplit<OpendalAzblob>;
277    type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
278    type SplitReader = OpendalReader<OpendalAzblob>;
279
280    const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
281}
282
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284pub struct OpendalAzblob;
285
286impl OpendalSource for OpendalAzblob {
287    type Properties = AzblobProperties;
288
289    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
290        OpendalEnumerator::new_azblob_source(properties)
291    }
292}
293
294#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
295pub struct BatchPosixFsProperties {
296    /// The root directory of the files to search. The files will be searched recursively.
297    #[serde(rename = "batch_posix_fs.root")]
298    pub root: String,
299
300    /// The glob pattern to match files under root directory.
301    #[serde(rename = "match_pattern", default)]
302    pub match_pattern: Option<String>,
303
304    #[serde(flatten)]
305    pub fs_common: FsSourceCommon,
306
307    #[serde(flatten)]
308    pub unknown_fields: HashMap<String, String>,
309}
310
311impl EnforceSecret for BatchPosixFsProperties {
312    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
313        Ok(())
314    }
315}
316
317impl UnknownFields for BatchPosixFsProperties {
318    fn unknown_fields(&self) -> HashMap<String, String> {
319        self.unknown_fields.clone()
320    }
321}
322
323impl SourceProperties for BatchPosixFsProperties {
324    type Split = batch_posix_fs_source::BatchPosixFsSplit;
325    type SplitEnumerator = batch_posix_fs_source::BatchPosixFsEnumerator;
326    type SplitReader = batch_posix_fs_source::BatchPosixFsReader;
327
328    const SOURCE_NAME: &'static str = BATCH_POSIX_FS_CONNECTOR;
329}