risingwave_object_store/object/opendal_engine/
opendal_s3.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use opendal::Operator;
19use opendal::layers::LoggingLayer;
20use opendal::raw::HttpClient;
21use opendal::services::S3;
22use risingwave_common::config::ObjectStoreConfig;
23
24use super::{MediaType, OpendalObjectStore};
25use crate::object::ObjectResult;
26use crate::object::object_metrics::ObjectStoreMetrics;
27
28impl OpendalObjectStore {
29    /// create opendal s3 engine.
30    pub fn new_s3_engine(
31        bucket: String,
32        config: Arc<ObjectStoreConfig>,
33        metrics: Arc<ObjectStoreMetrics>,
34    ) -> ObjectResult<Self> {
35        // Create s3 builder.
36        let mut builder = S3::default().bucket(&bucket);
37        // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field.
38        if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
39            builder = builder.endpoint(&endpoint_url);
40        }
41
42        if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
43            builder = builder.enable_virtual_host_style();
44        }
45
46        let http_client = Self::new_http_client(&config)?;
47        builder = builder.http_client(http_client);
48
49        let op: Operator = Operator::new(builder)?
50            .layer(LoggingLayer::default())
51            .finish();
52
53        Ok(Self {
54            op,
55            media_type: MediaType::S3,
56            config,
57            metrics,
58        })
59    }
60
61    /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
62    pub fn new_minio_engine(
63        server: &str,
64        config: Arc<ObjectStoreConfig>,
65        metrics: Arc<ObjectStoreMetrics>,
66    ) -> ObjectResult<Self> {
67        let server = server.strip_prefix("minio://").unwrap();
68        let (access_key_id, rest) = server.split_once(':').unwrap();
69        let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
70
71        let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
72            rest = rest_stripped;
73            "https://"
74        } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
75            rest = rest_stripped;
76            "http://"
77        } else {
78            "http://"
79        };
80        let (address, bucket) = rest.split_once('/').unwrap();
81        let builder = S3::default()
82            .bucket(bucket)
83            .region("custom")
84            .access_key_id(access_key_id)
85            .secret_access_key(secret_access_key)
86            .endpoint(&format!("{}{}", endpoint_prefix, address))
87            .disable_config_load()
88            .http_client(Self::new_http_client(&config)?);
89        let op: Operator = Operator::new(builder)?
90            .layer(LoggingLayer::default())
91            .finish();
92
93        Ok(Self {
94            op,
95            media_type: MediaType::Minio,
96            config,
97            metrics,
98        })
99    }
100
101    pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
102        let mut client_builder = reqwest::ClientBuilder::new();
103
104        if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
105            client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
106        }
107
108        if let Some(nodelay) = config.s3.nodelay.as_ref() {
109            client_builder = client_builder.tcp_nodelay(*nodelay);
110        }
111
112        Ok(HttpClient::build(client_builder)?)
113    }
114
115    /// currently used by snowflake sink,
116    /// especially when sinking to the intermediate s3 bucket.
117    pub fn new_s3_engine_with_credentials(
118        bucket: &str,
119        config: Arc<ObjectStoreConfig>,
120        metrics: Arc<ObjectStoreMetrics>,
121        aws_access_key_id: &str,
122        aws_secret_access_key: &str,
123        aws_region: &str,
124    ) -> ObjectResult<Self> {
125        // Create s3 builder with credentials.
126        let builder = S3::default()
127            // set credentials for s3 sink
128            .bucket(bucket)
129            .access_key_id(aws_access_key_id)
130            .secret_access_key(aws_secret_access_key)
131            .region(aws_region)
132            .disable_config_load()
133            .http_client(Self::new_http_client(config.as_ref())?);
134
135        let op: Operator = Operator::new(builder)?
136            .layer(LoggingLayer::default())
137            .finish();
138
139        Ok(Self {
140            op,
141            media_type: MediaType::S3,
142            config,
143            metrics,
144        })
145    }
146}