risingwave_connector/sink/file_sink/
s3.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::{BTreeMap, HashMap};
15
16use anyhow::anyhow;
17use opendal::Operator;
18use opendal::layers::{LoggingLayer, RetryLayer};
19use opendal::services::S3;
20use risingwave_common::util::env_var::env_var_is_true;
21use serde::Deserialize;
22use serde_with::serde_as;
23use with_options::WithOptions;
24
25use super::opendal_sink::{BatchingStrategy, FileSink};
26use crate::connector_common::DISABLE_DEFAULT_CREDENTIAL;
27use crate::deserialize_optional_bool_from_string;
28use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
29use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
30use crate::source::UnknownFields;
31#[derive(Deserialize, Debug, Clone, WithOptions)]
32pub struct S3Common {
33    #[serde(rename = "s3.region_name", alias = "snowflake.aws_region")]
34    pub region_name: String,
35    #[serde(rename = "s3.bucket_name", alias = "snowflake.s3_bucket")]
36    pub bucket_name: String,
37    /// The directory where the sink file is located.
38    #[serde(rename = "s3.path", alias = "snowflake.s3_path", default)]
39    pub path: Option<String>,
40    /// Enable config load. This parameter set to true will load s3 credentials from the environment. Only allowed to be used in a self-hosted environment.
41    #[serde(default, deserialize_with = "deserialize_optional_bool_from_string")]
42    pub enable_config_load: Option<bool>,
43    #[serde(
44        rename = "s3.credentials.access",
45        alias = "snowflake.aws_access_key_id",
46        default
47    )]
48    pub access: Option<String>,
49    #[serde(
50        rename = "s3.credentials.secret",
51        alias = "snowflake.aws_secret_access_key",
52        default
53    )]
54    pub secret: Option<String>,
55    #[serde(rename = "s3.endpoint_url")]
56    pub endpoint_url: Option<String>,
57    #[serde(rename = "s3.assume_role", default)]
58    pub assume_role: Option<String>,
59}
60
61impl S3Common {
62    pub fn enable_config_load(&self) -> bool {
63        // If the env var is set to true, we disable the default config load. (Cloud environment)
64        if env_var_is_true(DISABLE_DEFAULT_CREDENTIAL) {
65            return false;
66        }
67        self.enable_config_load.unwrap_or(false)
68    }
69}
70
71#[serde_as]
72#[derive(Clone, Debug, Deserialize, WithOptions)]
73pub struct S3Config {
74    #[serde(flatten)]
75    pub common: S3Common,
76
77    #[serde(flatten)]
78    pub batching_strategy: BatchingStrategy,
79
80    pub r#type: String, // accept "append-only"
81
82    #[serde(flatten)]
83    pub unknown_fields: HashMap<String, String>,
84}
85
86pub const S3_SINK: &str = "s3";
87
88impl<S: OpendalSinkBackend> FileSink<S> {
89    pub fn new_s3_sink(config: &S3Config) -> Result<Operator> {
90        // Create s3 builder.
91        let mut builder = S3::default()
92            .bucket(&config.common.bucket_name)
93            .region(&config.common.region_name);
94
95        if let Some(endpoint_url) = &config.common.endpoint_url {
96            builder = builder.endpoint(endpoint_url);
97        }
98
99        if let Some(access) = &config.common.access {
100            builder = builder.access_key_id(access);
101        }
102
103        if let Some(secret) = &config.common.secret {
104            builder = builder.secret_access_key(secret);
105        }
106
107        if let Some(assume_role) = &config.common.assume_role {
108            builder = builder.role_arn(assume_role);
109        }
110        // Default behavior is disable loading config from environment.
111        if !config.common.enable_config_load() {
112            builder = builder.disable_config_load();
113        }
114
115        let operator: Operator = Operator::new(builder)?
116            .layer(LoggingLayer::default())
117            .layer(RetryLayer::default())
118            .finish();
119
120        Ok(operator)
121    }
122}
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
125pub struct S3Sink;
126
127impl UnknownFields for S3Config {
128    fn unknown_fields(&self) -> HashMap<String, String> {
129        self.unknown_fields.clone()
130    }
131}
132
133impl OpendalSinkBackend for S3Sink {
134    type Properties = S3Config;
135
136    const SINK_NAME: &'static str = S3_SINK;
137
138    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
139        let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
140            .map_err(|e| SinkError::Config(anyhow!(e)))?;
141        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
142            return Err(SinkError::Config(anyhow!(
143                "`{}` must be {}, or {}",
144                SINK_TYPE_OPTION,
145                SINK_TYPE_APPEND_ONLY,
146                SINK_TYPE_UPSERT
147            )));
148        }
149        Ok(config)
150    }
151
152    fn new_operator(properties: S3Config) -> Result<Operator> {
153        FileSink::<S3Sink>::new_s3_sink(&properties)
154    }
155
156    fn get_path(properties: Self::Properties) -> String {
157        properties.common.path.unwrap_or_default()
158    }
159
160    fn get_engine_type() -> super::opendal_sink::EngineType {
161        super::opendal_sink::EngineType::S3
162    }
163
164    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
165        BatchingStrategy {
166            max_row_count: properties.batching_strategy.max_row_count,
167            rollover_seconds: properties.batching_strategy.rollover_seconds,
168            path_partition_prefix: properties.batching_strategy.path_partition_prefix,
169        }
170    }
171}
172
173#[derive(Debug, Clone, Copy, PartialEq, Eq)]
174pub struct SnowflakeSink;
175
176pub const SNOWFLAKE_SINK: &str = "snowflake";
177
178impl OpendalSinkBackend for SnowflakeSink {
179    type Properties = S3Config;
180
181    const SINK_NAME: &'static str = SNOWFLAKE_SINK;
182
183    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
184        let config = serde_json::from_value::<S3Config>(serde_json::to_value(btree_map).unwrap())
185            .map_err(|e| SinkError::Config(anyhow!(e)))?;
186        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
187            return Err(SinkError::Config(anyhow!(
188                "`{}` must be {}, or {}",
189                SINK_TYPE_OPTION,
190                SINK_TYPE_APPEND_ONLY,
191                SINK_TYPE_UPSERT
192            )));
193        }
194        Ok(config)
195    }
196
197    fn new_operator(properties: S3Config) -> Result<Operator> {
198        FileSink::<SnowflakeSink>::new_s3_sink(&properties)
199    }
200
201    fn get_path(properties: Self::Properties) -> String {
202        properties.common.path.unwrap_or_default()
203    }
204
205    fn get_engine_type() -> super::opendal_sink::EngineType {
206        super::opendal_sink::EngineType::Snowflake
207    }
208
209    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
210        BatchingStrategy {
211            max_row_count: properties.batching_strategy.max_row_count,
212            rollover_seconds: properties.batching_strategy.rollover_seconds,
213            path_partition_prefix: properties.batching_strategy.path_partition_prefix,
214        }
215    }
216}