risingwave_connector/source/filesystem/opendal_source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use phf::{Set, phf_set};
31
32use self::opendal_reader::OpendalReader;
33use super::OpendalFsSplit;
34use super::file_common::CompressionFormat;
35pub use super::s3::S3PropertiesCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::source::{SourceProperties, UnknownFields};
39
40pub const AZBLOB_CONNECTOR: &str = "azblob";
41pub const GCS_CONNECTOR: &str = "gcs";
42/// The new `s3_v2` will use opendal.
43/// Note: user uses `connector='s3'`, which is converted to `connector='s3_v2'` in frontend (in `validate_compatibility`).
44/// If user inputs `connector='s3_v2'`, it will be rejected.
45pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
46pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
47
48pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
49
50#[serde_as]
51#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
52pub struct FsSourceCommon {
53    #[serde(rename = "refresh.interval.sec")]
54    #[serde_as(as = "Option<DisplayFromStr>")]
55    pub refresh_interval_sec: Option<u64>,
56
57    #[serde(rename = "compression_format", default = "Default::default")]
58    pub compression_format: CompressionFormat,
59}
60
61#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
62pub struct GcsProperties {
63    #[serde(rename = "gcs.bucket_name")]
64    pub bucket_name: String,
65
66    /// The base64 encoded credential key. If not set, ADC will be used.
67    #[serde(rename = "gcs.credential")]
68    pub credential: Option<String>,
69
70    /// If credential/ADC is not set. The service account can be used to provide the credential info.
71    #[serde(rename = "gcs.service_account", default)]
72    pub service_account: Option<String>,
73
74    #[serde(rename = "match_pattern", default)]
75    pub match_pattern: Option<String>,
76
77    #[serde(flatten)]
78    pub fs_common: FsSourceCommon,
79
80    #[serde(flatten)]
81    pub unknown_fields: HashMap<String, String>,
82
83    #[serde(rename = "compression_format", default = "Default::default")]
84    pub compression_format: CompressionFormat,
85}
86
87impl EnforceSecret for GcsProperties {
88    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
89        "gcs.credential",
90        "gcs.service_account"
91    };
92}
93
94impl UnknownFields for GcsProperties {
95    fn unknown_fields(&self) -> HashMap<String, String> {
96        self.unknown_fields.clone()
97    }
98}
99
100impl SourceProperties for GcsProperties {
101    type Split = OpendalFsSplit<OpendalGcs>;
102    type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
103    type SplitReader = OpendalReader<OpendalGcs>;
104
105    const SOURCE_NAME: &'static str = GCS_CONNECTOR;
106}
107
108pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
109    type Properties: SourceProperties + Send + Sync;
110
111    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
112}
113
114#[derive(Debug, Clone, Copy, PartialEq, Eq)]
115pub struct OpendalS3;
116
117impl OpendalSource for OpendalS3 {
118    type Properties = OpendalS3Properties;
119
120    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
121        OpendalEnumerator::new_s3_source(
122            &properties.s3_properties,
123            properties.assume_role,
124            properties.fs_common.compression_format,
125        )
126    }
127}
128
129#[derive(Debug, Clone, Copy, PartialEq, Eq)]
130pub struct OpendalGcs;
131
132impl OpendalSource for OpendalGcs {
133    type Properties = GcsProperties;
134
135    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
136        OpendalEnumerator::new_gcs_source(properties)
137    }
138}
139
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141pub struct OpendalPosixFs;
142
143impl OpendalSource for OpendalPosixFs {
144    type Properties = PosixFsProperties;
145
146    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
147        OpendalEnumerator::new_posix_fs_source(properties)
148    }
149}
150
151#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
152pub struct OpendalS3Properties {
153    #[serde(flatten)]
154    pub s3_properties: S3PropertiesCommon,
155
156    /// The following are only supported by `s3_v2` (opendal) source.
157    #[serde(rename = "s3.assume_role", default)]
158    pub assume_role: Option<String>,
159
160    #[serde(flatten)]
161    pub fs_common: FsSourceCommon,
162
163    #[serde(flatten)]
164    pub unknown_fields: HashMap<String, String>,
165}
166
167impl EnforceSecret for OpendalS3Properties {
168    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
169        S3PropertiesCommon::enforce_secret(prop_iter)
170    }
171}
172
173impl UnknownFields for OpendalS3Properties {
174    fn unknown_fields(&self) -> HashMap<String, String> {
175        self.unknown_fields.clone()
176    }
177}
178
179impl SourceProperties for OpendalS3Properties {
180    type Split = OpendalFsSplit<OpendalS3>;
181    type SplitEnumerator = OpendalEnumerator<OpendalS3>;
182    type SplitReader = OpendalReader<OpendalS3>;
183
184    const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
185}
186
187#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
188pub struct PosixFsProperties {
189    /// The root directly of the files to search. The files will be searched recursively.
190    #[serde(rename = "posix_fs.root")]
191    pub root: String,
192
193    /// The regex pattern to match files under root directory.
194    #[serde(rename = "match_pattern", default)]
195    pub match_pattern: Option<String>,
196
197    #[serde(flatten)]
198    pub fs_common: FsSourceCommon,
199
200    #[serde(flatten)]
201    pub unknown_fields: HashMap<String, String>,
202    #[serde(rename = "compression_format", default = "Default::default")]
203    pub compression_format: CompressionFormat,
204}
205
206impl EnforceSecret for PosixFsProperties {
207    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
208        Ok(())
209    }
210}
211
212impl UnknownFields for PosixFsProperties {
213    fn unknown_fields(&self) -> HashMap<String, String> {
214        self.unknown_fields.clone()
215    }
216}
217
218impl SourceProperties for PosixFsProperties {
219    type Split = OpendalFsSplit<OpendalPosixFs>;
220    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
221    type SplitReader = OpendalReader<OpendalPosixFs>;
222
223    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
224}
225
226#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
227pub struct AzblobProperties {
228    #[serde(rename = "azblob.container_name")]
229    pub container_name: String,
230
231    #[serde(rename = "azblob.credentials.account_name", default)]
232    pub account_name: Option<String>,
233    #[serde(rename = "azblob.credentials.account_key", default)]
234    pub account_key: Option<String>,
235    #[serde(rename = "azblob.endpoint_url")]
236    pub endpoint_url: String,
237
238    #[serde(rename = "match_pattern", default)]
239    pub match_pattern: Option<String>,
240
241    #[serde(flatten)]
242    pub fs_common: FsSourceCommon,
243
244    #[serde(flatten)]
245    pub unknown_fields: HashMap<String, String>,
246
247    #[serde(rename = "compression_format", default = "Default::default")]
248    pub compression_format: CompressionFormat,
249}
250
251impl EnforceSecret for AzblobProperties {
252    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
253        "azblob.credentials.account_key",
254        "azblob.credentials.account_name",
255    };
256}
257
258impl UnknownFields for AzblobProperties {
259    fn unknown_fields(&self) -> HashMap<String, String> {
260        self.unknown_fields.clone()
261    }
262}
263
264impl SourceProperties for AzblobProperties {
265    type Split = OpendalFsSplit<OpendalAzblob>;
266    type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
267    type SplitReader = OpendalReader<OpendalAzblob>;
268
269    const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
270}
271
272#[derive(Debug, Clone, Copy, PartialEq, Eq)]
273pub struct OpendalAzblob;
274
275impl OpendalSource for OpendalAzblob {
276    type Properties = AzblobProperties;
277
278    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
279        OpendalEnumerator::new_azblob_source(properties)
280    }
281}