risingwave_connector/source/filesystem/opendal_source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17pub use batch_posix_fs_source::{BatchPosixFsEnumerator, BatchPosixFsReader, BatchPosixFsSplit};
18pub use opendal_enumerator::OpendalEnumerator;
19
20pub mod azblob_source;
21pub mod batch_posix_fs_source;
22pub mod gcs_source;
23pub mod posix_fs_source;
24pub mod s3_source;
25
26use serde::Deserialize;
27use serde_with::{DisplayFromStr, serde_as};
28use with_options::WithOptions;
29pub mod opendal_enumerator;
30pub mod opendal_reader;
31
32use phf::{Set, phf_set};
33
34use self::opendal_reader::OpendalReader;
35use super::OpendalFsSplit;
36use super::file_common::CompressionFormat;
37pub use super::s3::S3PropertiesCommon;
38use crate::enforce_secret::EnforceSecret;
39use crate::error::{ConnectorError, ConnectorResult};
40use crate::source::{SourceProperties, UnknownFields};
41
42pub const AZBLOB_CONNECTOR: &str = "azblob";
43pub const GCS_CONNECTOR: &str = "gcs";
44/// The new `s3_v2` will use opendal.
45/// Note: user uses `connector='s3'`, which is converted to `connector='s3_v2'` in frontend (in `validate_compatibility`).
46/// If user inputs `connector='s3_v2'`, it will be rejected.
47pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
48pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
49pub const BATCH_POSIX_FS_CONNECTOR: &str = "__for_testing_only_batch_posix_fs";
50
51pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
52
53#[serde_as]
54#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
55pub struct FsSourceCommon {
56    #[serde(rename = "refresh.interval.sec")]
57    #[serde_as(as = "Option<DisplayFromStr>")]
58    pub refresh_interval_sec: Option<u64>,
59
60    #[serde(rename = "compression_format", default = "Default::default")]
61    pub compression_format: CompressionFormat,
62}
63
64#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
65pub struct GcsProperties {
66    #[serde(rename = "gcs.bucket_name")]
67    pub bucket_name: String,
68
69    /// The base64 encoded credential key. If not set, ADC will be used.
70    #[serde(rename = "gcs.credential")]
71    pub credential: Option<String>,
72
73    /// If credential/ADC is not set. The service account can be used to provide the credential info.
74    #[serde(rename = "gcs.service_account", default)]
75    pub service_account: Option<String>,
76
77    #[serde(rename = "match_pattern", default)]
78    pub match_pattern: Option<String>,
79
80    #[serde(flatten)]
81    pub fs_common: FsSourceCommon,
82
83    #[serde(flatten)]
84    pub unknown_fields: HashMap<String, String>,
85
86    #[serde(rename = "compression_format", default = "Default::default")]
87    pub compression_format: CompressionFormat,
88}
89
90impl EnforceSecret for GcsProperties {
91    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
92        "gcs.credential",
93        "gcs.service_account"
94    };
95}
96
97impl UnknownFields for GcsProperties {
98    fn unknown_fields(&self) -> HashMap<String, String> {
99        self.unknown_fields.clone()
100    }
101}
102
103impl SourceProperties for GcsProperties {
104    type Split = OpendalFsSplit<OpendalGcs>;
105    type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
106    type SplitReader = OpendalReader<OpendalGcs>;
107
108    const SOURCE_NAME: &'static str = GCS_CONNECTOR;
109}
110
111pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
112    type Properties: SourceProperties + Send + Sync;
113
114    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
115}
116
117#[derive(Debug, Clone, Copy, PartialEq, Eq)]
118pub struct OpendalS3;
119
120impl OpendalSource for OpendalS3 {
121    type Properties = OpendalS3Properties;
122
123    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
124        OpendalEnumerator::new_s3_source(
125            &properties.s3_properties,
126            properties.assume_role,
127            properties.fs_common.compression_format,
128        )
129    }
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq)]
133pub struct OpendalGcs;
134
135impl OpendalSource for OpendalGcs {
136    type Properties = GcsProperties;
137
138    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
139        OpendalEnumerator::new_gcs_source(properties)
140    }
141}
142
143#[derive(Debug, Clone, Copy, PartialEq, Eq)]
144pub struct OpendalPosixFs;
145
146impl OpendalSource for OpendalPosixFs {
147    type Properties = PosixFsProperties;
148
149    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
150        OpendalEnumerator::new_posix_fs_source(properties)
151    }
152}
153
154#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
155pub struct OpendalS3Properties {
156    #[serde(flatten)]
157    pub s3_properties: S3PropertiesCommon,
158
159    /// The following are only supported by `s3_v2` (opendal) source.
160    #[serde(rename = "s3.assume_role", default)]
161    pub assume_role: Option<String>,
162
163    #[serde(flatten)]
164    pub fs_common: FsSourceCommon,
165
166    #[serde(flatten)]
167    pub unknown_fields: HashMap<String, String>,
168}
169
170impl EnforceSecret for OpendalS3Properties {
171    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
172        S3PropertiesCommon::enforce_secret(prop_iter)
173    }
174}
175
176impl UnknownFields for OpendalS3Properties {
177    fn unknown_fields(&self) -> HashMap<String, String> {
178        self.unknown_fields.clone()
179    }
180}
181
182impl SourceProperties for OpendalS3Properties {
183    type Split = OpendalFsSplit<OpendalS3>;
184    type SplitEnumerator = OpendalEnumerator<OpendalS3>;
185    type SplitReader = OpendalReader<OpendalS3>;
186
187    const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
188}
189
190#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
191pub struct PosixFsProperties {
192    /// The root directly of the files to search. The files will be searched recursively.
193    #[serde(rename = "posix_fs.root")]
194    pub root: String,
195
196    /// The regex pattern to match files under root directory.
197    #[serde(rename = "match_pattern", default)]
198    pub match_pattern: Option<String>,
199
200    #[serde(flatten)]
201    pub fs_common: FsSourceCommon,
202
203    #[serde(flatten)]
204    pub unknown_fields: HashMap<String, String>,
205    #[serde(rename = "compression_format", default = "Default::default")]
206    pub compression_format: CompressionFormat,
207}
208
209impl EnforceSecret for PosixFsProperties {
210    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
211        Ok(())
212    }
213}
214
215impl UnknownFields for PosixFsProperties {
216    fn unknown_fields(&self) -> HashMap<String, String> {
217        self.unknown_fields.clone()
218    }
219}
220
221impl SourceProperties for PosixFsProperties {
222    type Split = OpendalFsSplit<OpendalPosixFs>;
223    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
224    type SplitReader = OpendalReader<OpendalPosixFs>;
225
226    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
227}
228
229#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
230pub struct AzblobProperties {
231    #[serde(rename = "azblob.container_name")]
232    pub container_name: String,
233
234    #[serde(rename = "azblob.credentials.account_name", default)]
235    pub account_name: Option<String>,
236    #[serde(rename = "azblob.credentials.account_key", default)]
237    pub account_key: Option<String>,
238    #[serde(rename = "azblob.endpoint_url")]
239    pub endpoint_url: String,
240
241    #[serde(rename = "match_pattern", default)]
242    pub match_pattern: Option<String>,
243
244    #[serde(flatten)]
245    pub fs_common: FsSourceCommon,
246
247    #[serde(flatten)]
248    pub unknown_fields: HashMap<String, String>,
249
250    #[serde(rename = "compression_format", default = "Default::default")]
251    pub compression_format: CompressionFormat,
252}
253
254impl EnforceSecret for AzblobProperties {
255    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
256        "azblob.credentials.account_key",
257        "azblob.credentials.account_name",
258    };
259}
260
261impl UnknownFields for AzblobProperties {
262    fn unknown_fields(&self) -> HashMap<String, String> {
263        self.unknown_fields.clone()
264    }
265}
266
267impl SourceProperties for AzblobProperties {
268    type Split = OpendalFsSplit<OpendalAzblob>;
269    type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
270    type SplitReader = OpendalReader<OpendalAzblob>;
271
272    const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
273}
274
275#[derive(Debug, Clone, Copy, PartialEq, Eq)]
276pub struct OpendalAzblob;
277
278impl OpendalSource for OpendalAzblob {
279    type Properties = AzblobProperties;
280
281    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
282        OpendalEnumerator::new_azblob_source(properties)
283    }
284}
285
286#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
287pub struct BatchPosixFsProperties {
288    /// The root directory of the files to search. The files will be searched recursively.
289    #[serde(rename = "batch_posix_fs.root")]
290    pub root: String,
291
292    /// The glob pattern to match files under root directory.
293    #[serde(rename = "match_pattern", default)]
294    pub match_pattern: Option<String>,
295
296    #[serde(flatten)]
297    pub fs_common: FsSourceCommon,
298
299    #[serde(flatten)]
300    pub unknown_fields: HashMap<String, String>,
301}
302
303impl EnforceSecret for BatchPosixFsProperties {
304    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
305        Ok(())
306    }
307}
308
309impl UnknownFields for BatchPosixFsProperties {
310    fn unknown_fields(&self) -> HashMap<String, String> {
311        self.unknown_fields.clone()
312    }
313}
314
315impl SourceProperties for BatchPosixFsProperties {
316    type Split = batch_posix_fs_source::BatchPosixFsSplit;
317    type SplitEnumerator = batch_posix_fs_source::BatchPosixFsEnumerator;
318    type SplitReader = batch_posix_fs_source::BatchPosixFsReader;
319
320    const SOURCE_NAME: &'static str = BATCH_POSIX_FS_CONNECTOR;
321}