risingwave_object_store/object/opendal_engine/
opendal_s3.rs1use std::sync::Arc;
16use std::time::Duration;
17
18use opendal::Operator;
19use opendal::layers::LoggingLayer;
20use opendal::raw::HttpClient;
21use opendal::services::S3;
22use risingwave_common::config::ObjectStoreConfig;
23
24use super::{MediaType, OpendalObjectStore};
25use crate::object::ObjectResult;
26use crate::object::object_metrics::ObjectStoreMetrics;
27
28impl OpendalObjectStore {
29 pub fn new_s3_engine(
31 bucket: String,
32 config: Arc<ObjectStoreConfig>,
33 metrics: Arc<ObjectStoreMetrics>,
34 ) -> ObjectResult<Self> {
35 let mut builder = S3::default().bucket(&bucket);
37 if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
39 builder = builder.endpoint(&endpoint_url);
40 }
41
42 if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
43 builder = builder.enable_virtual_host_style();
44 }
45
46 let http_client = Self::new_http_client(&config)?;
47 builder = builder.http_client(http_client);
48
49 let op: Operator = Operator::new(builder)?
50 .layer(LoggingLayer::default())
51 .finish();
52
53 Ok(Self {
54 op,
55 media_type: MediaType::S3,
56 config,
57 metrics,
58 })
59 }
60
61 pub fn new_minio_engine(
63 server: &str,
64 config: Arc<ObjectStoreConfig>,
65 metrics: Arc<ObjectStoreMetrics>,
66 ) -> ObjectResult<Self> {
67 let server = server.strip_prefix("minio://").unwrap();
68 let (access_key_id, rest) = server.split_once(':').unwrap();
69 let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
70
71 let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
72 rest = rest_stripped;
73 "https://"
74 } else if let Some(rest_stripped) = rest.strip_prefix("http://") {
75 rest = rest_stripped;
76 "http://"
77 } else {
78 "http://"
79 };
80 let (address, bucket) = rest.split_once('/').unwrap();
81 let builder = S3::default()
82 .bucket(bucket)
83 .region("custom")
84 .access_key_id(access_key_id)
85 .secret_access_key(secret_access_key)
86 .endpoint(&format!("{}{}", endpoint_prefix, address))
87 .disable_config_load()
88 .http_client(Self::new_http_client(&config)?);
89 let op: Operator = Operator::new(builder)?
90 .layer(LoggingLayer::default())
91 .finish();
92
93 Ok(Self {
94 op,
95 media_type: MediaType::Minio,
96 config,
97 metrics,
98 })
99 }
100
101 pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
102 let mut client_builder = reqwest::ClientBuilder::new();
103
104 if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
105 client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
106 }
107
108 if let Some(nodelay) = config.s3.nodelay.as_ref() {
109 client_builder = client_builder.tcp_nodelay(*nodelay);
110 }
111
112 Ok(HttpClient::build(client_builder)?)
113 }
114
115 pub fn new_s3_engine_with_credentials(
118 bucket: &str,
119 config: Arc<ObjectStoreConfig>,
120 metrics: Arc<ObjectStoreMetrics>,
121 aws_access_key_id: &str,
122 aws_secret_access_key: &str,
123 aws_region: &str,
124 ) -> ObjectResult<Self> {
125 let builder = S3::default()
127 .bucket(bucket)
129 .access_key_id(aws_access_key_id)
130 .secret_access_key(aws_secret_access_key)
131 .region(aws_region)
132 .disable_config_load()
133 .http_client(Self::new_http_client(config.as_ref())?);
134
135 let op: Operator = Operator::new(builder)?
136 .layer(LoggingLayer::default())
137 .finish();
138
139 Ok(Self {
140 op,
141 media_type: MediaType::S3,
142 config,
143 metrics,
144 })
145 }
146}