risingwave_object_store/object/opendal_engine/
obs.rs1use std::sync::Arc;
16
17use opendal::Operator;
18use opendal::layers::LoggingLayer;
19use opendal::services::Obs;
20use risingwave_common::config::ObjectStoreConfig;
21
22use super::{MediaType, OpendalObjectStore};
23use crate::object::ObjectResult;
24use crate::object::object_metrics::ObjectStoreMetrics;
25
26impl OpendalObjectStore {
27 pub fn new_obs_engine(
29 bucket: String,
30 root: String,
31 config: Arc<ObjectStoreConfig>,
32 metrics: Arc<ObjectStoreMetrics>,
33 ) -> ObjectResult<Self> {
34 let mut builder = Obs::default().bucket(&bucket).root(&root);
36
37 let endpoint = std::env::var("OBS_ENDPOINT")
38 .unwrap_or_else(|_| panic!("OBS_ENDPOINT not found from environment variables"));
39 let access_key_id = std::env::var("OBS_ACCESS_KEY_ID")
40 .unwrap_or_else(|_| panic!("OBS_ACCESS_KEY_ID not found from environment variables"));
41 let secret_access_key = std::env::var("OBS_SECRET_ACCESS_KEY").unwrap_or_else(|_| {
42 panic!("OBS_SECRET_ACCESS_KEY not found from environment variables")
43 });
44
45 builder = builder
46 .endpoint(&endpoint)
47 .access_key_id(&access_key_id)
48 .secret_access_key(&secret_access_key);
49
50 let op: Operator = Operator::new(builder)?
51 .layer(LoggingLayer::default())
52 .finish();
53 Ok(Self {
54 op,
55 media_type: MediaType::Obs,
56 config,
57 metrics,
58 })
59 }
60}