risingwave_connector/source/filesystem/opendal_source/
gcs_source.rs1use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Gcs;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{GcsProperties, OpendalSource};
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::enumerator::get_prefix;
26
27impl<Src: OpendalSource> OpendalEnumerator<Src> {
28 pub fn new_gcs_source(gcs_properties: GcsProperties) -> ConnectorResult<Self> {
30 let mut builder = Gcs::default().bucket(&gcs_properties.bucket_name);
32 if let Some(cred) = gcs_properties.credential {
34 builder = builder.credential(&cred);
35 } else {
36 let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
37 if let Ok(cred) = cred {
38 builder = builder.credential(&cred);
39 }
40 }
41
42 if let Some(service_account) = gcs_properties.service_account {
43 builder = builder.service_account(&service_account);
44 }
45 let op: Operator = Operator::new(builder)?
46 .layer(LoggingLayer::default())
47 .layer(RetryLayer::default())
48 .finish();
49
50 let (prefix, matcher) = if let Some(pattern) = gcs_properties.match_pattern.as_ref() {
51 let prefix = get_prefix(pattern);
52 let matcher = glob::Pattern::new(pattern)
53 .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
54 (Some(prefix), Some(matcher))
55 } else {
56 (None, None)
57 };
58
59 let compression_format = gcs_properties.compression_format;
60 Ok(Self {
61 op,
62 prefix,
63 matcher,
64 marker: PhantomData,
65 compression_format,
66 })
67 }
68}