risingwave_connector/sink/file_sink/
s3.rs1use std::collections::{BTreeMap, HashMap};
15
16use anyhow::anyhow;
17use opendal::Operator;
18use opendal::layers::{LoggingLayer, RetryLayer};
19use opendal::services::S3;
20use risingwave_common::util::env_var::env_var_is_true;
21use serde::Deserialize;
22use serde_with::serde_as;
23use with_options::WithOptions;
24
25use super::opendal_sink::{BatchingStrategy, FileSink};
26use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
27use crate::deserialize_optional_bool_from_string;
28use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
29use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::source::UnknownFields;
31#[derive(Deserialize, Debug, Clone, WithOptions)]
32pub struct S3Common {
33 #[serde(rename = "s3.region_name", alias = "snowflake.aws_region")]
34 pub region_name: String,
35 #[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")]
36 pub bucket_name: String,
37 #[serde(rename = "s3.path", alias = "snowflake.s3_path", default)]
39 pub path: Option<String>,
40 #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
42 pub enable_config_load: Option<bool>,
43 #[serde(
44 rename = "s3.credentials.access",
45 alias = "snowflake.aws_access_key_id",
46 default
47 )]
48 pub access: Option<String>,
49 #[serde(
50 rename = "s3.credentials.secret",
51 alias = "snowflake.aws_secret_access_key",
52 default
53 )]
54 pub secret: Option<String>,
55 #[serde(rename = "s3.endpoint_url")]
56 pub endpoint_url: Option<String>,
57 #[serde(rename = "s3.assume_role", default)]
58 pub assume_role: Option<String>,
59}
60
61impl S3Common {
62 pub fn enable_config_load(&self) -> bool {
63 if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
65 return false;
66 }
67 self.enable_config_load.unwrap_or(false)
68 }
69}
70
71#[serde_as]
72#[derive(Clone, Debug, Deserialize, WithOptions)]
73pub struct S3Config {
74 #[serde(flatten)]
75 pub common: S3Common,
76
77 #[serde(flatten)]
78 pub batching_strategy: BatchingStrategy,
79
80 pub r#type: String, #[serde(flatten)]
83 pub unknown_fields: HashMap<String, String>,
84}
85
86pub const S3_SINK: &str = "s3";
87
88impl<S: OpendalSinkBackend> FileSink<S> {
89 pub fn new_s3_sink(config: &S3Config) -> Result<Operator> {
90 let mut builder = S3::default()
92 .bucket(&config.common.bucket_name)
93 .region(&config.common.region_name);
94
95 if let Some(endpoint_url) = &config.common.endpoint_url {
96 builder = builder.endpoint(endpoint_url);
97 }
98
99 if let Some(access) = &config.common.access {
100 builder = builder.access_key_id(access);
101 }
102
103 if let Some(secret) = &config.common.secret {
104 builder = builder.secret_access_key(secret);
105 }
106
107 if let Some(assume_role) = &config.common.assume_role {
108 builder = builder.role_arn(assume_role);
109 }
110 if !config.common.enable_config_load() {
112 builder = builder.disable_config_load();
113 }
114
115 let operator: Operator = Operator::new(builder)?
116 .layer(LoggingLayer::default())
117 .layer(RetryLayer::default())
118 .finish();
119
120 Ok(operator)
121 }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub struct S3Sink;
126
127impl UnknownFields for S3Config {
128 fn unknown_fields(&self) -> HashMap<String, String> {
129 self.unknown_fields.clone()
130 }
131}
132
133impl OpendalSinkBackend for S3Sink {
134 type Properties = S3Config;
135
136 const SINK_NAME: &'static str = S3_SINK;
137
138 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
139 let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
140 .map_err(|e| SinkError::Config(anyhow!(e)))?;
141 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
142 return Err(SinkError::Config(anyhow!(
143 "`{}` must be {}, or {}",
144 SINK_TYPE_OPTION,
145 SINK_TYPE_APPEND_ONLY,
146 SINK_TYPE_UPSERT
147 )));
148 }
149 Ok(config)
150 }
151
152 fn new_operator(properties: S3Config) -> Result<Operator> {
153 FileSink::<S3Sink>::new_s3_sink(&properties)
154 }
155
156 fn get_path(properties: Self::Properties) -> String {
157 properties.common.path.unwrap_or_default()
158 }
159
160 fn get_engine_type() -> super::opendal_sink::EngineType {
161 super::opendal_sink::EngineType::S3
162 }
163
164 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
165 BatchingStrategy {
166 max_row_count: properties.batching_strategy.max_row_count,
167 rollover_seconds: properties.batching_strategy.rollover_seconds,
168 path_partition_prefix: properties.batching_strategy.path_partition_prefix,
169 }
170 }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub struct SnowflakeSink;
175
176pub const SNOWFLAKE_SINK: &str = "snowflake";
177
178impl OpendalSinkBackend for SnowflakeSink {
179 type Properties = S3Config;
180
181 const SINK_NAME: &'static str = SNOWFLAKE_SINK;
182
183 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
184 let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
185 .map_err(|e| SinkError::Config(anyhow!(e)))?;
186 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
187 return Err(SinkError::Config(anyhow!(
188 "`{}` must be {}, or {}",
189 SINK_TYPE_OPTION,
190 SINK_TYPE_APPEND_ONLY,
191 SINK_TYPE_UPSERT
192 )));
193 }
194 Ok(config)
195 }
196
197 fn new_operator(properties: S3Config) -> Result<Operator> {
198 FileSink::<SnowflakeSink>::new_s3_sink(&properties)
199 }
200
201 fn get_path(properties: Self::Properties) -> String {
202 properties.common.path.unwrap_or_default()
203 }
204
205 fn get_engine_type() -> super::opendal_sink::EngineType {
206 super::opendal_sink::EngineType::Snowflake
207 }
208
209 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
210 BatchingStrategy {
211 max_row_count: properties.batching_strategy.max_row_count,
212 rollover_seconds: properties.batching_strategy.rollover_seconds,
213 path_partition_prefix: properties.batching_strategy.path_partition_prefix,
214 }
215 }
216}