risingwave_connector/source/filesystem/opendal_source/
gcs_source.rsuse std::marker::PhantomData;
use anyhow::Context;
use opendal::layers::{LoggingLayer, RetryLayer};
use opendal::services::Gcs;
use opendal::Operator;
use super::opendal_enumerator::OpendalEnumerator;
use super::{GcsProperties, OpendalSource};
use crate::error::ConnectorResult;
use crate::source::filesystem::s3::enumerator::get_prefix;
impl<Src: OpendalSource> OpendalEnumerator<Src> {
pub fn new_gcs_source(gcs_properties: GcsProperties) -> ConnectorResult<Self> {
let mut builder = Gcs::default().bucket(&gcs_properties.bucket_name);
if let Some(cred) = gcs_properties.credential {
builder = builder.credential(&cred);
} else {
let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
if let Ok(cred) = cred {
builder = builder.credential(&cred);
}
}
if let Some(service_account) = gcs_properties.service_account {
builder = builder.service_account(&service_account);
}
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(RetryLayer::default())
.finish();
let (prefix, matcher) = if let Some(pattern) = gcs_properties.match_pattern.as_ref() {
let prefix = get_prefix(pattern);
let matcher = glob::Pattern::new(pattern)
.with_context(|| format!("Invalid match_pattern: {}", pattern))?;
(Some(prefix), Some(matcher))
} else {
(None, None)
};
let compression_format = gcs_properties.compression_format;
Ok(Self {
op,
prefix,
matcher,
marker: PhantomData,
compression_format,
})
}
}