risingwave_object_store/object/opendal_engine/
obs.rs1use std::sync::Arc;
16
17use opendal::Operator;
18use opendal::layers::LoggingLayer;
19use opendal::services::Obs;
20use risingwave_common::config::ObjectStoreConfig;
21
22use super::{MediaType, OpendalObjectStore, new_operator};
23use crate::object::ObjectResult;
24use crate::object::object_metrics::ObjectStoreMetrics;
25
26impl OpendalObjectStore {
27 pub fn new_obs_engine(
29 bucket: String,
30 root: String,
31 config: Arc<ObjectStoreConfig>,
32 metrics: Arc<ObjectStoreMetrics>,
33 ) -> ObjectResult<Self> {
34 let mut builder = Obs::default().bucket(&bucket).root(&root);
36
37 let endpoint = std::env::var("OBS_ENDPOINT")
38 .unwrap_or_else(|_| panic!("OBS_ENDPOINT not found from environment variables"));
39 let access_key_id = std::env::var("OBS_ACCESS_KEY_ID")
40 .unwrap_or_else(|_| panic!("OBS_ACCESS_KEY_ID not found from environment variables"));
41 let secret_access_key = std::env::var("OBS_SECRET_ACCESS_KEY").unwrap_or_else(|_| {
42 panic!("OBS_SECRET_ACCESS_KEY not found from environment variables")
43 });
44
45 builder = builder
46 .endpoint(&endpoint)
47 .access_key_id(&access_key_id)
48 .secret_access_key(&secret_access_key);
49
50 let op = new_operator(
51 &config,
52 Operator::new(builder)?.layer(LoggingLayer::default()),
53 );
54
55 Ok(Self {
56 op,
57 media_type: MediaType::Obs,
58 config,
59 metrics,
60 })
61 }
62}