risingwave_object_store/object/opendal_engine/
s3.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::time::Duration;
17
18use opendal::Operator;
19use opendal::layers::{HttpClientLayer, LoggingLayer};
20use opendal::raw::HttpClient;
21use opendal::services::S3;
22use risingwave_common::config::ObjectStoreConfig;
23
24use super::{MediaType, OpendalObjectStore, new_operator};
25use crate::object::ObjectResult;
26use crate::object::object_metrics::ObjectStoreMetrics;
27
28impl OpendalObjectStore {
29    /// create opendal s3 engine.
30    pub fn new_s3_engine(
31        bucket: String,
32        config: Arc<ObjectStoreConfig>,
33        metrics: Arc<ObjectStoreMetrics>,
34    ) -> ObjectResult<Self> {
35        // Create s3 builder.
36        let mut builder = S3::default().bucket(&bucket);
37        // For AWS S3, there is no need to set an endpoint; for other S3 compatible object stores, it is necessary to set this field.
38        if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
39            builder = builder.endpoint(&endpoint_url);
40        }
41
42        if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
43            builder = builder.enable_virtual_host_style();
44        }
45
46        let http_client = Self::new_http_client(&config)?;
47
48        let op = new_operator(
49            &config,
50            Operator::new(builder)?
51                .layer(HttpClientLayer::new(http_client))
52                .layer(LoggingLayer::default()),
53        );
54
55        Ok(Self {
56            op,
57            media_type: MediaType::S3,
58            config,
59            metrics,
60        })
61    }
62
63    /// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
64    pub fn new_minio_engine(
65        server: &str,
66        config: Arc<ObjectStoreConfig>,
67        metrics: Arc<ObjectStoreMetrics>,
68    ) -> ObjectResult<Self> {
69        let server = server.strip_prefix("minio://").unwrap();
70        let (access_key_id, rest) = server.split_once(':').unwrap();
71        let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
72
73        let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
74            rest = rest_stripped;
75            "https://"
76        } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
77            rest = rest_stripped;
78            "http://"
79        } else {
80            "http://"
81        };
82        let (address, bucket) = rest.split_once('/').unwrap();
83        let builder = S3::default()
84            .bucket(bucket)
85            .region("custom")
86            .access_key_id(access_key_id)
87            .secret_access_key(secret_access_key)
88            .endpoint(&format!("{}{}", endpoint_prefix, address))
89            .disable_config_load();
90
91        let http_client = Self::new_http_client(&config)?;
92
93        let op: Operator = Operator::new(builder)?
94            .layer(HttpClientLayer::new(http_client))
95            .layer(LoggingLayer::default())
96            .finish();
97
98        Ok(Self {
99            op,
100            media_type: MediaType::Minio,
101            config,
102            metrics,
103        })
104    }
105
106    pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
107        let mut client_builder = reqwest::ClientBuilder::new();
108
109        if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
110            client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
111        }
112
113        if let Some(nodelay) = config.s3.nodelay.as_ref() {
114            client_builder = client_builder.tcp_nodelay(*nodelay);
115        }
116        #[allow(deprecated)]
117        Ok(HttpClient::build(client_builder)?)
118    }
119}