risingwave_connector/sink/file_sink/
s3.rs1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21use risingwave_common::util::env_var::env_var_is_true;
22use serde::Deserialize;
23use serde_with::serde_as;
24use with_options::WithOptions;
25
26use super::opendal_sink::{BatchingStrategy, FileSink};
27use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
28use crate::deserialize_optional_bool_from_string;
29use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
30use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
31use crate::source::UnknownFields;
32#[derive(Deserialize, Debug, Clone, WithOptions)]
33pub struct S3Common {
34 #[serde(rename = "s3.region_name", alias = "snowflake.aws_region")]
35 pub region_name: String,
36 #[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")]
37 pub bucket_name: String,
38 #[serde(rename = "s3.path", alias = "snowflake.s3_path", default)]
40 pub path: Option<String>,
41 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
43 pub enable_config_load: Option<bool>,
44 #[serde(
45 rename = "s3.credentials.access",
46 alias = "snowflake.aws_access_key_id",
47 default
48 )]
49 pub access: Option<String>,
50 #[serde(
51 rename = "s3.credentials.secret",
52 alias = "snowflake.aws_secret_access_key",
53 default
54 )]
55 pub secret: Option<String>,
56 #[serde(rename = "s3.endpoint_url")]
57 pub endpoint_url: Option<String>,
58 #[serde(rename = "s3.assume_role", default)]
59 pub assume_role: Option<String>,
60}
61
62impl S3Common {
63 pub fn enable_config_load(&self) -> bool {
64 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
66 return false;
67 }
68 self.enable_config_load.unwrap_or(false)
69 }
70}
71
72#[serde_as]
73#[derive(Clone, Debug, Deserialize, WithOptions)]
74pub struct S3Config {
75 #[serde(flatten)]
76 pub common: S3Common,
77
78 #[serde(flatten)]
79 pub batching_strategy: BatchingStrategy,
80
81 pub r#type: String, #[serde(flatten)]
84 pub unknown_fields: HashMap<String, String>,
85}
86
87pub const S3_SINK: &str = "s3";
88
89impl<S: OpendalSinkBackend> FileSink<S> {
90 pub fn new_s3_sink(common: &S3Common) -> Result<Operator> {
91 let mut builder = S3::default()
93 .bucket(&common.bucket_name)
94 .region(&common.region_name);
95
96 if let Some(endpoint_url) = &common.endpoint_url {
97 builder = builder.endpoint(endpoint_url);
98 }
99
100 if let Some(access) = &common.access {
101 builder = builder.access_key_id(access);
102 }
103
104 if let Some(secret) = &common.secret {
105 builder = builder.secret_access_key(secret);
106 }
107
108 if let Some(assume_role) = &common.assume_role {
109 builder = builder.role_arn(assume_role);
110 }
111 if !common.enable_config_load() {
113 builder = builder.disable_config_load();
114 }
115
116 let operator: Operator = Operator::new(builder)?
117 .layer(LoggingLayer::default())
118 .layer(RetryLayer::default())
119 .finish();
120
121 Ok(operator)
122 }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct S3Sink;
127
128impl UnknownFields for S3Config {
129 fn unknown_fields(&self) -> HashMap<String, String> {
130 self.unknown_fields.clone()
131 }
132}
133
134impl OpendalSinkBackend for S3Sink {
135 type Properties = S3Config;
136
137 const SINK_NAME: &'static str = S3_SINK;
138
139 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
140 let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
141 .map_err(|e| SinkError::Config(anyhow!(e)))?;
142 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
143 return Err(SinkError::Config(anyhow!(
144 "`{}` must be {}, or {}",
145 SINK_TYPE_OPTION,
146 SINK_TYPE_APPEND_ONLY,
147 SINK_TYPE_UPSERT
148 )));
149 }
150 Ok(config)
151 }
152
153 fn new_operator(properties: S3Config) -> Result<Operator> {
154 FileSink::<S3Sink>::new_s3_sink(&properties.common)
155 }
156
157 fn get_path(properties: Self::Properties) -> String {
158 properties.common.path.unwrap_or_default()
159 }
160
161 fn get_engine_type() -> super::opendal_sink::EngineType {
162 super::opendal_sink::EngineType::S3
163 }
164
165 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
166 BatchingStrategy {
167 max_row_count: properties.batching_strategy.max_row_count,
168 rollover_seconds: properties.batching_strategy.rollover_seconds,
169 path_partition_prefix: properties.batching_strategy.path_partition_prefix,
170 }
171 }
172}
173
174#[serde_as]
176#[derive(Deserialize, Debug, Clone, WithOptions)]
177pub struct SnowflakeConfig {
178 #[serde(flatten)]
179 pub inner: S3Config,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub struct SnowflakeSink;
184
185pub const SNOWFLAKE_SINK: &str = "snowflake";
186
187impl OpendalSinkBackend for SnowflakeSink {
188 type Properties = S3Config;
189
190 const SINK_NAME: &'static str = SNOWFLAKE_SINK;
191
192 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
193 let config =
194 serde_json::from_value::<SnowflakeConfig>(serde_json::to_value(btree_map).unwrap())
195 .map_err(|e| SinkError::Config(anyhow!(e)))?
196 .inner;
197 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
198 return Err(SinkError::Config(anyhow!(
199 "`{}` must be {}, or {}",
200 SINK_TYPE_OPTION,
201 SINK_TYPE_APPEND_ONLY,
202 SINK_TYPE_UPSERT
203 )));
204 }
205 Ok(config)
206 }
207
208 fn new_operator(properties: S3Config) -> Result<Operator> {
209 FileSink::<SnowflakeSink>::new_s3_sink(&properties.common)
210 }
211
212 fn get_path(properties: Self::Properties) -> String {
213 properties.common.path.unwrap_or_default()
214 }
215
216 fn get_engine_type() -> super::opendal_sink::EngineType {
217 super::opendal_sink::EngineType::Snowflake
218 }
219
220 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
221 BatchingStrategy {
222 max_row_count: properties.batching_strategy.max_row_count,
223 rollover_seconds: properties.batching_strategy.rollover_seconds,
224 path_partition_prefix: properties.batching_strategy.path_partition_prefix,
225 }
226 }
227}