risingwave_connector/source/filesystem/opendal_source/
mod.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use self::opendal_reader::OpendalReader;
31use super::OpendalFsSplit;
32use super::file_common::CompressionFormat;
33pub use super::s3::S3PropertiesCommon;
34use crate::error::ConnectorResult;
35use crate::source::{SourceProperties, UnknownFields};
36
37pub const AZBLOB_CONNECTOR: &str = "azblob";
38pub const GCS_CONNECTOR: &str = "gcs";
39/// The new `s3_v2` will use opendal.
40/// Note: user uses `connector='s3'`, which is converted to `connector='s3_v2'` in frontend (in `validate_compatibility`).
41/// If user inputs `connector='s3_v2'`, it will be rejected.
42pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
43pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
44
45pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
49pub struct FsSourceCommon {
50    #[serde(rename = "refresh.interval.sec")]
51    #[serde_as(as = "Option<DisplayFromStr>")]
52    pub refresh_interval_sec: Option<u64>,
53}
54#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
55pub struct GcsProperties {
56    #[serde(rename = "gcs.bucket_name")]
57    pub bucket_name: String,
58
59    /// The base64 encoded credential key. If not set, ADC will be used.
60    #[serde(rename = "gcs.credential")]
61    pub credential: Option<String>,
62
63    /// If credential/ADC is not set. The service account can be used to provide the credential info.
64    #[serde(rename = "gcs.service_account", default)]
65    pub service_account: Option<String>,
66
67    #[serde(rename = "match_pattern", default)]
68    pub match_pattern: Option<String>,
69
70    #[serde(flatten)]
71    pub fs_common: FsSourceCommon,
72
73    #[serde(flatten)]
74    pub unknown_fields: HashMap<String, String>,
75
76    #[serde(rename = "compression_format", default = "Default::default")]
77    pub compression_format: CompressionFormat,
78}
79
80impl UnknownFields for GcsProperties {
81    fn unknown_fields(&self) -> HashMap<String, String> {
82        self.unknown_fields.clone()
83    }
84}
85
86impl SourceProperties for GcsProperties {
87    type Split = OpendalFsSplit<OpendalGcs>;
88    type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
89    type SplitReader = OpendalReader<OpendalGcs>;
90
91    const SOURCE_NAME: &'static str = GCS_CONNECTOR;
92}
93
94pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
95    type Properties: SourceProperties + Send + Sync;
96
97    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub struct OpendalS3;
102
103impl OpendalSource for OpendalS3 {
104    type Properties = OpendalS3Properties;
105
106    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
107        OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role)
108    }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct OpendalGcs;
113
114impl OpendalSource for OpendalGcs {
115    type Properties = GcsProperties;
116
117    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
118        OpendalEnumerator::new_gcs_source(properties)
119    }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct OpendalPosixFs;
124
125impl OpendalSource for OpendalPosixFs {
126    type Properties = PosixFsProperties;
127
128    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
129        OpendalEnumerator::new_posix_fs_source(properties)
130    }
131}
132
133#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
134pub struct OpendalS3Properties {
135    #[serde(flatten)]
136    pub s3_properties: S3PropertiesCommon,
137
138    /// The following are only supported by `s3_v2` (opendal) source.
139    #[serde(rename = "s3.assume_role", default)]
140    pub assume_role: Option<String>,
141
142    #[serde(flatten)]
143    pub fs_common: FsSourceCommon,
144
145    #[serde(flatten)]
146    pub unknown_fields: HashMap<String, String>,
147}
148
149impl UnknownFields for OpendalS3Properties {
150    fn unknown_fields(&self) -> HashMap<String, String> {
151        self.unknown_fields.clone()
152    }
153}
154
155impl SourceProperties for OpendalS3Properties {
156    type Split = OpendalFsSplit<OpendalS3>;
157    type SplitEnumerator = OpendalEnumerator<OpendalS3>;
158    type SplitReader = OpendalReader<OpendalS3>;
159
160    const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
161}
162
163#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
164pub struct PosixFsProperties {
165    /// The root directly of the files to search. The files will be searched recursively.
166    #[serde(rename = "posix_fs.root")]
167    pub root: String,
168
169    /// The regex pattern to match files under root directory.
170    #[serde(rename = "match_pattern", default)]
171    pub match_pattern: Option<String>,
172
173    #[serde(flatten)]
174    pub fs_common: FsSourceCommon,
175
176    #[serde(flatten)]
177    pub unknown_fields: HashMap<String, String>,
178    #[serde(rename = "compression_format", default = "Default::default")]
179    pub compression_format: CompressionFormat,
180}
181
182impl UnknownFields for PosixFsProperties {
183    fn unknown_fields(&self) -> HashMap<String, String> {
184        self.unknown_fields.clone()
185    }
186}
187
188impl SourceProperties for PosixFsProperties {
189    type Split = OpendalFsSplit<OpendalPosixFs>;
190    type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
191    type SplitReader = OpendalReader<OpendalPosixFs>;
192
193    const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
194}
195
196#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
197pub struct AzblobProperties {
198    #[serde(rename = "azblob.container_name")]
199    pub container_name: String,
200
201    #[serde(rename = "azblob.credentials.account_name", default)]
202    pub account_name: Option<String>,
203    #[serde(rename = "azblob.credentials.account_key", default)]
204    pub account_key: Option<String>,
205    #[serde(rename = "azblob.endpoint_url")]
206    pub endpoint_url: String,
207
208    #[serde(rename = "match_pattern", default)]
209    pub match_pattern: Option<String>,
210
211    #[serde(flatten)]
212    pub fs_common: FsSourceCommon,
213
214    #[serde(flatten)]
215    pub unknown_fields: HashMap<String, String>,
216
217    #[serde(rename = "compression_format", default = "Default::default")]
218    pub compression_format: CompressionFormat,
219}
220
221impl UnknownFields for AzblobProperties {
222    fn unknown_fields(&self) -> HashMap<String, String> {
223        self.unknown_fields.clone()
224    }
225}
226
227impl SourceProperties for AzblobProperties {
228    type Split = OpendalFsSplit<OpendalAzblob>;
229    type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
230    type SplitReader = OpendalReader<OpendalAzblob>;
231
232    const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub struct OpendalAzblob;
237
238impl OpendalSource for OpendalAzblob {
239    type Properties = AzblobProperties;
240
241    fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
242        OpendalEnumerator::new_azblob_source(properties)
243    }
244}