risingwave_connector/sink/file_sink/
s3.rs

1// Copyright 2024 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::S3;
21use risingwave_common::util::env_var::env_var_is_true;
22use serde::Deserialize;
23use serde_with::serde_as;
24use with_options::WithOptions;
25
26use super::opendal_sink::{BatchingStrategy, FileSink};
27use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
28use crate::deserialize_optional_bool_from_string;
29use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
30use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
31use crate::source::UnknownFields;
32#[derive(Deserialize, Debug, Clone, WithOptions)]
33pub struct S3Common {
34    #[serde(rename = "s3.region_name", alias = "snowflake.aws_region")]
35    pub region_name: String,
36    #[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")]
37    pub bucket_name: String,
38    /// The directory where the sink file is located.
39    #[serde(rename = "s3.path", alias = "snowflake.s3_path", default)]
40    pub path: Option<String>,
41    /// Enable config load. This parameter set to true will load s3 credentials from the environment. Only allowed to be used in a self-hosted environment.
42    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
43    pub enable_config_load: Option<bool>,
44    #[serde(
45        rename = "s3.credentials.access",
46        alias = "snowflake.aws_access_key_id",
47        default
48    )]
49    pub access: Option<String>,
50    #[serde(
51        rename = "s3.credentials.secret",
52        alias = "snowflake.aws_secret_access_key",
53        default
54    )]
55    pub secret: Option<String>,
56    #[serde(rename = "s3.endpoint_url")]
57    pub endpoint_url: Option<String>,
58    #[serde(rename = "s3.assume_role", default)]
59    pub assume_role: Option<String>,
60}
61
62impl S3Common {
63    pub fn enable_config_load(&self) -> bool {
64        // If the env var is set to true, we disable the default config load. (Cloud environment)
65        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
66            return false;
67        }
68        self.enable_config_load.unwrap_or(false)
69    }
70}
71
72#[serde_as]
73#[derive(Clone, Debug, Deserialize, WithOptions)]
74pub struct S3Config {
75    #[serde(flatten)]
76    pub common: S3Common,
77
78    #[serde(flatten)]
79    pub batching_strategy: BatchingStrategy,
80
81    pub r#type: String, // accept "append-only"
82
83    #[serde(flatten)]
84    pub unknown_fields: HashMap<String, String>,
85}
86
87pub const S3_SINK: &str = "s3";
88
89impl<S: OpendalSinkBackend> FileSink<S> {
90    pub fn new_s3_sink(common: &S3Common) -> Result<Operator> {
91        // Create s3 builder.
92        let mut builder = S3::default()
93            .bucket(&common.bucket_name)
94            .region(&common.region_name);
95
96        if let Some(endpoint_url) = &common.endpoint_url {
97            builder = builder.endpoint(endpoint_url);
98        }
99
100        if let Some(access) = &common.access {
101            builder = builder.access_key_id(access);
102        }
103
104        if let Some(secret) = &common.secret {
105            builder = builder.secret_access_key(secret);
106        }
107
108        if let Some(assume_role) = &common.assume_role {
109            builder = builder.role_arn(assume_role);
110        }
111        // Default behavior is disable loading config from environment.
112        if !common.enable_config_load() {
113            builder = builder.disable_config_load();
114        }
115
116        let operator: Operator = Operator::new(builder)?
117            .layer(LoggingLayer::default())
118            .layer(RetryLayer::default())
119            .finish();
120
121        Ok(operator)
122    }
123}
124
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct S3Sink;
127
128impl UnknownFields for S3Config {
129    fn unknown_fields(&self) -> HashMap<String, String> {
130        self.unknown_fields.clone()
131    }
132}
133
134impl OpendalSinkBackend for S3Sink {
135    type Properties = S3Config;
136
137    const SINK_NAME: &'static str = S3_SINK;
138
139    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
140        let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
141            .map_err(|e| SinkError::Config(anyhow!(e)))?;
142        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
143            return Err(SinkError::Config(anyhow!(
144                "`{}` must be {}, or {}",
145                SINK_TYPE_OPTION,
146                SINK_TYPE_APPEND_ONLY,
147                SINK_TYPE_UPSERT
148            )));
149        }
150        Ok(config)
151    }
152
153    fn new_operator(properties: S3Config) -> Result<Operator> {
154        FileSink::<S3Sink>::new_s3_sink(&properties.common)
155    }
156
157    fn get_path(properties: Self::Properties) -> String {
158        properties.common.path.unwrap_or_default()
159    }
160
161    fn get_engine_type() -> super::opendal_sink::EngineType {
162        super::opendal_sink::EngineType::S3
163    }
164
165    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
166        BatchingStrategy {
167            max_row_count: properties.batching_strategy.max_row_count,
168            rollover_seconds: properties.batching_strategy.rollover_seconds,
169            path_partition_prefix: properties.batching_strategy.path_partition_prefix,
170        }
171    }
172}
173
174// Wrapper for expanding in the sink config type.
175#[serde_as]
176#[derive(Deserialize, Debug, Clone, WithOptions)]
177pub struct SnowflakeConfig {
178    #[serde(flatten)]
179    pub inner: S3Config,
180}
181
182#[derive(Debug, Clone, Copy, PartialEq, Eq)]
183pub struct SnowflakeSink;
184
185pub const SNOWFLAKE_SINK: &str = "snowflake";
186
187impl OpendalSinkBackend for SnowflakeSink {
188    type Properties = S3Config;
189
190    const SINK_NAME: &'static str = SNOWFLAKE_SINK;
191
192    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
193        let config =
194            serde_json::from_value::<SnowflakeConfig>(serde_json::to_value(btree_map).unwrap())
195                .map_err(|e| SinkError::Config(anyhow!(e)))?
196                .inner;
197        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
198            return Err(SinkError::Config(anyhow!(
199                "`{}` must be {}, or {}",
200                SINK_TYPE_OPTION,
201                SINK_TYPE_APPEND_ONLY,
202                SINK_TYPE_UPSERT
203            )));
204        }
205        Ok(config)
206    }
207
208    fn new_operator(properties: S3Config) -> Result<Operator> {
209        FileSink::<SnowflakeSink>::new_s3_sink(&properties.common)
210    }
211
212    fn get_path(properties: Self::Properties) -> String {
213        properties.common.path.unwrap_or_default()
214    }
215
216    fn get_engine_type() -> super::opendal_sink::EngineType {
217        super::opendal_sink::EngineType::Snowflake
218    }
219
220    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
221        BatchingStrategy {
222            max_row_count: properties.batching_strategy.max_row_count,
223            rollover_seconds: properties.batching_strategy.rollover_seconds,
224            path_partition_prefix: properties.batching_strategy.path_partition_prefix,
225        }
226    }
227}