risingwave_connector/source/filesystem/opendal_source/
azblob_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Azblob;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{AzblobProperties, OpendalSource};
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::enumerator::get_prefix;
26
27impl<Src: OpendalSource> OpendalEnumerator<Src> {
28    /// create opendal azblob source.
29    pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult<Self> {
30        // Create azblob builder.
31        let mut builder = Azblob::default()
32            .container(&azblob_properties.container_name)
33            .endpoint(&azblob_properties.endpoint_url);
34
35        if let Some(account_name) = azblob_properties.account_name {
36            builder = builder.account_name(&account_name);
37        } else {
38            tracing::warn!(
39                "account_name azblob is not set, container  {}",
40                azblob_properties.container_name
41            );
42        }
43
44        if let Some(account_key) = azblob_properties.account_key {
45            builder = builder.account_key(&account_key);
46        } else {
47            tracing::warn!(
48                "account_key azblob is not set, container  {}",
49                azblob_properties.container_name
50            );
51        }
52        let op: Operator = Operator::new(builder)?
53            .layer(LoggingLayer::default())
54            .layer(RetryLayer::default())
55            .finish();
56
57        let (prefix, matcher) = if let Some(pattern) = azblob_properties.match_pattern.as_ref() {
58            let prefix = get_prefix(pattern);
59            let matcher = glob::Pattern::new(pattern)
60                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
61            (Some(prefix), Some(matcher))
62        } else {
63            (None, None)
64        };
65
66        let compression_format = azblob_properties.compression_format;
67        Ok(Self {
68            op,
69            prefix,
70            matcher,
71            marker: PhantomData,
72            compression_format,
73        })
74    }
75}