risingwave_object_store/object/opendal_engine/
opendal_s3.rsuse std::sync::Arc;
use std::time::Duration;
use opendal::layers::LoggingLayer;
use opendal::raw::HttpClient;
use opendal::services::S3;
use opendal::Operator;
use risingwave_common::config::ObjectStoreConfig;
use super::{MediaType, OpendalObjectStore};
use crate::object::object_metrics::ObjectStoreMetrics;
use crate::object::ObjectResult;
impl OpendalObjectStore {
pub fn new_s3_engine(
bucket: String,
config: Arc<ObjectStoreConfig>,
metrics: Arc<ObjectStoreMetrics>,
) -> ObjectResult<Self> {
let mut builder = S3::default().bucket(&bucket);
if let Ok(endpoint_url) = std::env::var("RW_S3_ENDPOINT") {
builder = builder.endpoint(&endpoint_url);
}
if std::env::var("RW_IS_FORCE_PATH_STYLE").is_err() {
builder = builder.enable_virtual_host_style();
}
let http_client = Self::new_http_client(&config)?;
builder = builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.finish();
Ok(Self {
op,
media_type: MediaType::S3,
config,
metrics,
})
}
pub fn new_minio_engine(
server: &str,
config: Arc<ObjectStoreConfig>,
metrics: Arc<ObjectStoreMetrics>,
) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();
let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();
let builder = S3::default()
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address))
.disable_config_load()
.http_client(Self::new_http_client(&config)?);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.finish();
Ok(Self {
op,
media_type: MediaType::Minio,
config,
metrics,
})
}
pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();
if let Some(keepalive_ms) = config.s3.keepalive_ms.as_ref() {
client_builder = client_builder.tcp_keepalive(Duration::from_millis(*keepalive_ms));
}
if let Some(nodelay) = config.s3.nodelay.as_ref() {
client_builder = client_builder.tcp_nodelay(*nodelay);
}
Ok(HttpClient::build(client_builder)?)
}
pub fn new_s3_engine_with_credentials(
bucket: &str,
config: Arc<ObjectStoreConfig>,
metrics: Arc<ObjectStoreMetrics>,
aws_access_key_id: &str,
aws_secret_access_key: &str,
aws_region: &str,
) -> ObjectResult<Self> {
let builder = S3::default()
.bucket(bucket)
.access_key_id(aws_access_key_id)
.secret_access_key(aws_secret_access_key)
.region(aws_region)
.disable_config_load()
.http_client(Self::new_http_client(config.as_ref())?);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.finish();
Ok(Self {
op,
media_type: MediaType::S3,
config,
metrics,
})
}
}