risingwave_object_store/object/opendal_engine/
mod.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod opendal_object_store;
16
17use opendal::layers::{ConcurrentLimitLayer, LoggingLayer};
18use opendal::raw::Access;
19use opendal::{Operator, OperatorBuilder};
20pub use opendal_object_store::*;
21use risingwave_common::config::ObjectStoreConfig;
22
23#[cfg(feature = "hdfs-backend")]
24pub mod hdfs;
25pub mod webhdfs;
26
27pub mod gcs;
28
29pub mod obs;
30
31pub mod azblob;
32pub mod oss;
33pub mod s3;
34
35pub mod fs;
36
37// To make sure the the operation is consistent, we should specially set `atomic_write_dir` for fs, hdfs and webhdfs services.
38const ATOMIC_WRITE_DIR: &str = "atomic_write_dir/";
39
40fn new_operator(config: &ObjectStoreConfig, builder: OperatorBuilder<impl Access>) -> Operator {
41    // Tokio semaphore rejects values above `usize::MAX >> 3`.
42    const UNLIMITED_OPERATION_CONCURRENCY: usize = usize::MAX >> 3;
43
44    if config.req_concurrency_limit > 0 || config.http_concurrent_limit > 0 {
45        let operation_concurrency_limit = if config.req_concurrency_limit > 0 {
46            config.req_concurrency_limit
47        } else {
48            UNLIMITED_OPERATION_CONCURRENCY
49        };
50        let mut concurrent_limit_layer = ConcurrentLimitLayer::new(operation_concurrency_limit);
51        if config.http_concurrent_limit > 0 {
52            concurrent_limit_layer =
53                concurrent_limit_layer.with_http_concurrent_limit(config.http_concurrent_limit);
54        }
55        builder.layer(concurrent_limit_layer).finish()
56    } else {
57        builder.layer(LoggingLayer::default()).finish()
58    }
59}