risingwave_connector/source/filesystem/opendal_source/
azblob_source.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Azblob;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{AzblobProperties, OpendalSource};
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::enumerator::get_prefix;
26
27impl<Src: OpendalSource> OpendalEnumerator<Src> {
28 pub fn new_azblob_source(azblob_properties: AzblobProperties) -> ConnectorResult<Self> {
30 let mut builder = Azblob::default()
32 .container(&azblob_properties.container_name)
33 .endpoint(&azblob_properties.endpoint_url);
34
35 if let Some(account_name) = azblob_properties.account_name {
36 builder = builder.account_name(&account_name);
37 } else {
38 tracing::warn!(
39 "account_name azblob is not set, container {}",
40 azblob_properties.container_name
41 );
42 }
43
44 if let Some(account_key) = azblob_properties.account_key {
45 builder = builder.account_key(&account_key);
46 } else {
47 tracing::warn!(
48 "account_key azblob is not set, container {}",
49 azblob_properties.container_name
50 );
51 }
52 let op: Operator = Operator::new(builder)?
53 .layer(LoggingLayer::default())
54 .layer(RetryLayer::default())
55 .finish();
56
57 let (prefix, matcher) = if let Some(pattern) = azblob_properties.match_pattern.as_ref() {
58 let prefix = get_prefix(pattern);
59 let matcher = glob::Pattern::new(pattern)
60 .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
61 (Some(prefix), Some(matcher))
62 } else {
63 (None, None)
64 };
65
66 let compression_format = azblob_properties.compression_format;
67 Ok(Self {
68 op,
69 prefix,
70 matcher,
71 marker: PhantomData,
72 compression_format,
73 })
74 }
75}