risingwave_connector/source/filesystem/opendal_source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use phf::{Set, phf_set};
31
32use self::opendal_reader::OpendalReader;
33use super::OpendalFsSplit;
34use super::file_common::CompressionFormat;
35pub use super::s3::S3PropertiesCommon;
36use crate::enforce_secret::EnforceSecret;
37use crate::error::{ConnectorError, ConnectorResult};
38use crate::source::{SourceProperties, UnknownFields};
39
40pub const AZBLOB_CONNECTOR: &str = "azblob";
41pub const GCS_CONNECTOR: &str = "gcs";
42/// The new `s3_v2` will use opendal.
43/// Note: user uses `connector='s3'`, which is converted to `connector='s3_v2'` in frontend (in `validate_compatibility`).
44/// If user inputs `connector='s3_v2'`, it will be rejected.
45pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
46pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
47
48pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
49
50#[serde_as]
51#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
52pub struct FsSourceCommon {
53    #[serde(rename = "refresh.interval.sec")]
54    #[serde_as(as = "Option<DisplayFromStr>")]
55    pub refresh_interval_sec: Option<u64>,
56}
57
58#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
59pub struct GcsProperties {
60    #[serde(rename = "gcs.bucket_name")]
61    pub bucket_name: String,
62
63    /// The base64 encoded credential key. If not set, ADC will be used.
64    #[serde(rename = "gcs.credential")]
65    pub credential: Option<String>,
66
67    /// If credential/ADC is not set. The service account can be used to provide the credential info.
68    #[serde(rename = "gcs.service_account", default)]
69    pub service_account: Option<String>,
70
71    #[serde(rename = "match_pattern", default)]
72    pub match_pattern: Option<String>,
73
74    #[serde(flatten)]
75    pub fs_common: FsSourceCommon,
76
77    #[serde(flatten)]
78    pub unknown_fields: HashMap<String, String>,
79
80    #[serde(rename = "compression_format", default = "Default::default")]
81    pub compression_format: CompressionFormat,
82}
83
84impl EnforceSecret for GcsProperties {
85    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
86        "gcs.credential",
87        "gcs.service_account"
88    };
89}
90
91impl UnknownFields for GcsProperties {
92    fn unknown_fields(&self) -> HashMap<String, String> {
93        self.unknown_fields.clone()
94    }
95}
96
97impl SourceProperties for GcsProperties {
98    type Split = OpendalFsSplit<OpendalGcs>;
99    type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
100    type SplitReader = OpendalReader<OpendalGcs>;
101
102    const SOURCE_NAME: &'static str = GCS_CONNECTOR;
103}
104
105pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
106    type Properties: SourceProperties + Send + Sync;
107
108    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct OpendalS3;
113
114impl OpendalSource for OpendalS3 {
115    type Properties = OpendalS3Properties;
116
117    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
118        OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role)
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct OpendalGcs;
124
125impl OpendalSource for OpendalGcs {
126    type Properties = GcsProperties;
127
128    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
129        OpendalEnumerator::new_gcs_source(properties)
130    }
131}
132
133#[derive(Debug, Clone, Copy, PartialEq, Eq)]
134pub struct OpendalPosixFs;
135
136impl OpendalSource for OpendalPosixFs {
137    type Properties = PosixFsProperties;
138
139    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
140        OpendalEnumerator::new_posix_fs_source(properties)
141    }
142}
143
144#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
145pub struct OpendalS3Properties {
146    #[serde(flatten)]
147    pub s3_properties: S3PropertiesCommon,
148
149    /// The following are only supported by `s3_v2` (opendal) source.
150    #[serde(rename = "s3.assume_role", default)]
151    pub assume_role: Option<String>,
152
153    #[serde(flatten)]
154    pub fs_common: FsSourceCommon,
155
156    #[serde(flatten)]
157    pub unknown_fields: HashMap<String, String>,
158}
159
160impl EnforceSecret for OpendalS3Properties {
161    fn enforce_secret<'a>(prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
162        S3PropertiesCommon::enforce_secret(prop_iter)
163    }
164}
165
166impl UnknownFields for OpendalS3Properties {
167    fn unknown_fields(&self) -> HashMap<String, String> {
168        self.unknown_fields.clone()
169    }
170}
171
172impl SourceProperties for OpendalS3Properties {
173    type Split = OpendalFsSplit<OpendalS3>;
174    type SplitEnumerator = OpendalEnumerator<OpendalS3>;
175    type SplitReader = OpendalReader<OpendalS3>;
176
177    const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
178}
179
180#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
181pub struct PosixFsProperties {
182    /// The root directly of the files to search. The files will be searched recursively.
183    #[serde(rename = "posix_fs.root")]
184    pub root: String,
185
186    /// The regex pattern to match files under root directory.
187    #[serde(rename = "match_pattern", default)]
188    pub match_pattern: Option<String>,
189
190    #[serde(flatten)]
191    pub fs_common: FsSourceCommon,
192
193    #[serde(flatten)]
194    pub unknown_fields: HashMap<String, String>,
195    #[serde(rename = "compression_format", default = "Default::default")]
196    pub compression_format: CompressionFormat,
197}
198
199impl EnforceSecret for PosixFsProperties {
200    fn enforce_secret<'a>(_prop_iter: impl Iterator<Item = &'a str>) -> Result<(), ConnectorError> {
201        Ok(())
202    }
203}
204
205impl UnknownFields for PosixFsProperties {
206    fn unknown_fields(&self) -> HashMap<String, String> {
207        self.unknown_fields.clone()
208    }
209}
210
211impl SourceProperties for PosixFsProperties {
212    type Split = OpendalFsSplit<OpendalPosixFs>;
213    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
214    type SplitReader = OpendalReader<OpendalPosixFs>;
215
216    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
217}
218
219#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
220pub struct AzblobProperties {
221    #[serde(rename = "azblob.container_name")]
222    pub container_name: String,
223
224    #[serde(rename = "azblob.credentials.account_name", default)]
225    pub account_name: Option<String>,
226    #[serde(rename = "azblob.credentials.account_key", default)]
227    pub account_key: Option<String>,
228    #[serde(rename = "azblob.endpoint_url")]
229    pub endpoint_url: String,
230
231    #[serde(rename = "match_pattern", default)]
232    pub match_pattern: Option<String>,
233
234    #[serde(flatten)]
235    pub fs_common: FsSourceCommon,
236
237    #[serde(flatten)]
238    pub unknown_fields: HashMap<String, String>,
239
240    #[serde(rename = "compression_format", default = "Default::default")]
241    pub compression_format: CompressionFormat,
242}
243
244impl EnforceSecret for AzblobProperties {
245    const ENFORCE_SECRET_PROPERTIES: Set<&'static str> = phf_set! {
246        "azblob.credentials.account_key",
247        "azblob.credentials.account_name",
248    };
249}
250
251impl UnknownFields for AzblobProperties {
252    fn unknown_fields(&self) -> HashMap<String, String> {
253        self.unknown_fields.clone()
254    }
255}
256
257impl SourceProperties for AzblobProperties {
258    type Split = OpendalFsSplit<OpendalAzblob>;
259    type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
260    type SplitReader = OpendalReader<OpendalAzblob>;
261
262    const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
263}
264
265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266pub struct OpendalAzblob;
267
268impl OpendalSource for OpendalAzblob {
269    type Properties = AzblobProperties;
270
271    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
272        OpendalEnumerator::new_azblob_source(properties)
273    }
274}