risingwave_object_store/object/opendal_engine/
mod.rs1pub mod opendal_object_store;
16
17use opendal::layers::{ConcurrentLimitLayer, LoggingLayer};
18use opendal::raw::Access;
19use opendal::{Operator, OperatorBuilder};
20pub use opendal_object_store::*;
21use risingwave_common::config::ObjectStoreConfig;
22
23#[cfg(feature = "hdfs-backend")]
24pub mod hdfs;
25pub mod webhdfs;
26
27pub mod gcs;
28
29pub mod obs;
30
31pub mod azblob;
32pub mod oss;
33pub mod s3;
34
35pub mod fs;
36
37const ATOMIC_WRITE_DIR: &str = "atomic_write_dir/";
39
40fn new_operator(config: &ObjectStoreConfig, builder: OperatorBuilder<impl Access>) -> Operator {
41 const UNLIMITED_OPERATION_CONCURRENCY: usize = usize::MAX >> 3;
43
44 if config.req_concurrency_limit > 0 || config.http_concurrent_limit > 0 {
45 let operation_concurrency_limit = if config.req_concurrency_limit > 0 {
46 config.req_concurrency_limit
47 } else {
48 UNLIMITED_OPERATION_CONCURRENCY
49 };
50 let mut concurrent_limit_layer = ConcurrentLimitLayer::new(operation_concurrency_limit);
51 if config.http_concurrent_limit > 0 {
52 concurrent_limit_layer =
53 concurrent_limit_layer.with_http_concurrent_limit(config.http_concurrent_limit);
54 }
55 builder.layer(concurrent_limit_layer).finish()
56 } else {
57 builder.layer(LoggingLayer::default()).finish()
58 }
59}