risingwave_connector/source/filesystem/opendal_source/
mod.rs1use std::collections::HashMap;
16
17pub use opendal_enumerator::OpendalEnumerator;
18
19pub mod azblob_source;
20pub mod gcs_source;
21pub mod posix_fs_source;
22pub mod s3_source;
23
24use serde::Deserialize;
25use serde_with::{DisplayFromStr, serde_as};
26use with_options::WithOptions;
27pub mod opendal_enumerator;
28pub mod opendal_reader;
29
30use self::opendal_reader::OpendalReader;
31use super::OpendalFsSplit;
32use super::file_common::CompressionFormat;
33pub use super::s3::S3PropertiesCommon;
34use crate::error::ConnectorResult;
35use crate::source::{SourceProperties, UnknownFields};
36
37pub const AZBLOB_CONNECTOR: &str = "azblob";
38pub const GCS_CONNECTOR: &str = "gcs";
39pub const OPENDAL_S3_CONNECTOR: &str = "s3_v2";
43pub const POSIX_FS_CONNECTOR: &str = "posix_fs";
44
45pub const DEFAULT_REFRESH_INTERVAL_SEC: u64 = 60;
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
49pub struct FsSourceCommon {
50 #[serde(rename = "refresh.interval.sec")]
51 #[serde_as(as = "Option<DisplayFromStr>")]
52 pub refresh_interval_sec: Option<u64>,
53}
54#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
55pub struct GcsProperties {
56 #[serde(rename = "gcs.bucket_name")]
57 pub bucket_name: String,
58
59 #[serde(rename = "gcs.credential")]
61 pub credential: Option<String>,
62
63 #[serde(rename = "gcs.service_account", default)]
65 pub service_account: Option<String>,
66
67 #[serde(rename = "match_pattern", default)]
68 pub match_pattern: Option<String>,
69
70 #[serde(flatten)]
71 pub fs_common: FsSourceCommon,
72
73 #[serde(flatten)]
74 pub unknown_fields: HashMap<String, String>,
75
76 #[serde(rename = "compression_format", default = "Default::default")]
77 pub compression_format: CompressionFormat,
78}
79
80impl UnknownFields for GcsProperties {
81 fn unknown_fields(&self) -> HashMap<String, String> {
82 self.unknown_fields.clone()
83 }
84}
85
86impl SourceProperties for GcsProperties {
87 type Split = OpendalFsSplit<OpendalGcs>;
88 type SplitEnumerator = OpendalEnumerator<OpendalGcs>;
89 type SplitReader = OpendalReader<OpendalGcs>;
90
91 const SOURCE_NAME: &'static str = GCS_CONNECTOR;
92}
93
94pub trait OpendalSource: Send + Sync + 'static + Clone + PartialEq {
95 type Properties: SourceProperties + Send + Sync;
96
97 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>>;
98}
99
100#[derive(Debug, Clone, Copy, PartialEq, Eq)]
101pub struct OpendalS3;
102
103impl OpendalSource for OpendalS3 {
104 type Properties = OpendalS3Properties;
105
106 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
107 OpendalEnumerator::new_s3_source(properties.s3_properties, properties.assume_role)
108 }
109}
110
111#[derive(Debug, Clone, Copy, PartialEq, Eq)]
112pub struct OpendalGcs;
113
114impl OpendalSource for OpendalGcs {
115 type Properties = GcsProperties;
116
117 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
118 OpendalEnumerator::new_gcs_source(properties)
119 }
120}
121
122#[derive(Debug, Clone, Copy, PartialEq, Eq)]
123pub struct OpendalPosixFs;
124
125impl OpendalSource for OpendalPosixFs {
126 type Properties = PosixFsProperties;
127
128 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
129 OpendalEnumerator::new_posix_fs_source(properties)
130 }
131}
132
133#[derive(Clone, Debug, Deserialize, PartialEq, with_options::WithOptions)]
134pub struct OpendalS3Properties {
135 #[serde(flatten)]
136 pub s3_properties: S3PropertiesCommon,
137
138 #[serde(rename = "s3.assume_role", default)]
140 pub assume_role: Option<String>,
141
142 #[serde(flatten)]
143 pub fs_common: FsSourceCommon,
144
145 #[serde(flatten)]
146 pub unknown_fields: HashMap<String, String>,
147}
148
149impl UnknownFields for OpendalS3Properties {
150 fn unknown_fields(&self) -> HashMap<String, String> {
151 self.unknown_fields.clone()
152 }
153}
154
155impl SourceProperties for OpendalS3Properties {
156 type Split = OpendalFsSplit<OpendalS3>;
157 type SplitEnumerator = OpendalEnumerator<OpendalS3>;
158 type SplitReader = OpendalReader<OpendalS3>;
159
160 const SOURCE_NAME: &'static str = OPENDAL_S3_CONNECTOR;
161}
162
163#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
164pub struct PosixFsProperties {
165 #[serde(rename = "posix_fs.root")]
167 pub root: String,
168
169 #[serde(rename = "match_pattern", default)]
171 pub match_pattern: Option<String>,
172
173 #[serde(flatten)]
174 pub fs_common: FsSourceCommon,
175
176 #[serde(flatten)]
177 pub unknown_fields: HashMap<String, String>,
178 #[serde(rename = "compression_format", default = "Default::default")]
179 pub compression_format: CompressionFormat,
180}
181
182impl UnknownFields for PosixFsProperties {
183 fn unknown_fields(&self) -> HashMap<String, String> {
184 self.unknown_fields.clone()
185 }
186}
187
188impl SourceProperties for PosixFsProperties {
189 type Split = OpendalFsSplit<OpendalPosixFs>;
190 type SplitEnumerator = OpendalEnumerator<OpendalPosixFs>;
191 type SplitReader = OpendalReader<OpendalPosixFs>;
192
193 const SOURCE_NAME: &'static str = POSIX_FS_CONNECTOR;
194}
195
196#[derive(Clone, Debug, Deserialize, PartialEq, WithOptions)]
197pub struct AzblobProperties {
198 #[serde(rename = "azblob.container_name")]
199 pub container_name: String,
200
201 #[serde(rename = "azblob.credentials.account_name", default)]
202 pub account_name: Option<String>,
203 #[serde(rename = "azblob.credentials.account_key", default)]
204 pub account_key: Option<String>,
205 #[serde(rename = "azblob.endpoint_url")]
206 pub endpoint_url: String,
207
208 #[serde(rename = "match_pattern", default)]
209 pub match_pattern: Option<String>,
210
211 #[serde(flatten)]
212 pub fs_common: FsSourceCommon,
213
214 #[serde(flatten)]
215 pub unknown_fields: HashMap<String, String>,
216
217 #[serde(rename = "compression_format", default = "Default::default")]
218 pub compression_format: CompressionFormat,
219}
220
221impl UnknownFields for AzblobProperties {
222 fn unknown_fields(&self) -> HashMap<String, String> {
223 self.unknown_fields.clone()
224 }
225}
226
227impl SourceProperties for AzblobProperties {
228 type Split = OpendalFsSplit<OpendalAzblob>;
229 type SplitEnumerator = OpendalEnumerator<OpendalAzblob>;
230 type SplitReader = OpendalReader<OpendalAzblob>;
231
232 const SOURCE_NAME: &'static str = AZBLOB_CONNECTOR;
233}
234
235#[derive(Debug, Clone, Copy, PartialEq, Eq)]
236pub struct OpendalAzblob;
237
238impl OpendalSource for OpendalAzblob {
239 type Properties = AzblobProperties;
240
241 fn new_enumerator(properties: Self::Properties) -> ConnectorResult<OpendalEnumerator<Self>> {
242 OpendalEnumerator::new_azblob_source(properties)
243 }
244}