risingwave_object_store/object/opendal_engine/
s3.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use opendal::Operator;
19use opendal::layers::{HttpClientLayer, LoggingLayer};
20use opendal::raw::HttpClient;
21use opendal::services::S3;
22use risingwave_common::config::ObjectStoreConfig;
23
24use super::{MediaType, OpendalObjectStore, new_operator};
25use crate::object::ObjectResult;
26use crate::object::object_metrics::ObjectStoreMetrics;
27
28impl OpendalObjectStore {
29 pub fn new_s3_engine(
31 bucket: String,
32 config: Arc<ObjectStoreConfig>,
33 metrics: Arc<ObjectStoreMetrics>,
34 ) -> ObjectResult<Self> {
35 let mut builder = S3::default().bucket(&bucket);
37 if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
39 builder = builder.endpoint(&endpoint_url);
40 }
41
42 if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
43 builder = builder.enable_virtual_host_style();
44 }
45
46 let http_client = Self::new_http_client(&config)?;
47
48 let op = new_operator(
49 &config,
50 Operator::new(builder)?
51 .layer(HttpClientLayer::new(http_client))
52 .layer(LoggingLayer::default()),
53 );
54
55 Ok(Self {
56 op,
57 media_type: MediaType::S3,
58 config,
59 metrics,
60 })
61 }
62
63 pub fn new_minio_engine(
65 server: &str,
66 config: Arc<ObjectStoreConfig>,
67 metrics: Arc<ObjectStoreMetrics>,
68 ) -> ObjectResult<Self> {
69 let server = server.strip_prefix("minio://").unwrap();
70 let (access_key_id, rest) = server.split_once(':').unwrap();
71 let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
72
73 let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
74 rest = rest_stripped;
75 "https://"
76 } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
77 rest = rest_stripped;
78 "http://"
79 } else {
80 "http://"
81 };
82 let (address, bucket) = rest.split_once('/').unwrap();
83 let builder = S3::default()
84 .bucket(bucket)
85 .region("custom")
86 .access_key_id(access_key_id)
87 .secret_access_key(secret_access_key)
88 .endpoint(&format!("{}{}", endpoint_prefix, address))
89 .disable_config_load();
90
91 let http_client = Self::new_http_client(&config)?;
92
93 let op: Operator = Operator::new(builder)?
94 .layer(HttpClientLayer::new(http_client))
95 .layer(LoggingLayer::default())
96 .finish();
97
98 Ok(Self {
99 op,
100 media_type: MediaType::Minio,
101 config,
102 metrics,
103 })
104 }
105
106 pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
107 let mut client_builder = reqwest::ClientBuilder::new();
108
109 if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
110 client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
111 }
112
113 if let Some(nodelay) = config.s3.nodelay.as_ref() {
114 client_builder = client_builder.tcp_nodelay(*nodelay);
115 }
116 #[allow(deprecated)]
117 Ok(HttpClient::build(client_builder)?)
118 }
119}