risingwave_connector/source/filesystem/opendal_source/
gcs_source.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::marker::PhantomData;
16
17use anyhow::Context;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Gcs;
21
22use super::opendal_enumerator::OpendalEnumerator;
23use super::{GcsProperties, OpendalSource};
24use crate::error::ConnectorResult;
25use crate::source::filesystem::s3::enumerator::get_prefix;
26
27impl<Src: OpendalSource> OpendalEnumerator<Src> {
28    /// create opendal gcs source.
29    pub fn new_gcs_source(gcs_properties: GcsProperties) -> ConnectorResult<Self> {
30        // Create gcs builder.
31        let mut builder = Gcs::default().bucket(&gcs_properties.bucket_name);
32        // if credential env is set, use it. Otherwise, ADC will be used.
33        if let Some(cred) = gcs_properties.credential {
34            builder = builder.credential(&cred);
35        } else {
36            let cred = std::env::var("GOOGLE_APPLICATION_CREDENTIALS");
37            if let Ok(cred) = cred {
38                builder = builder.credential(&cred);
39            }
40        }
41
42        if let Some(service_account) = gcs_properties.service_account {
43            builder = builder.service_account(&service_account);
44        }
45        let op: Operator = Operator::new(builder)?
46            .layer(LoggingLayer::default())
47            .layer(RetryLayer::default())
48            .finish();
49
50        let (prefix, matcher) = if let Some(pattern) = gcs_properties.match_pattern.as_ref() {
51            let prefix = get_prefix(pattern);
52            let matcher = glob::Pattern::new(pattern)
53                .with_context(|| format!("Invalid match_pattern: {}", pattern))?;
54            (Some(prefix), Some(matcher))
55        } else {
56            (None, None)
57        };
58
59        let compression_format = gcs_properties.compression_format;
60        Ok(Self {
61            op,
62            prefix,
63            matcher,
64            marker: PhantomData,
65            compression_format,
66        })
67    }
68}