risingwave_connector/sink/file_sink/
gcs.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14use std::collections::{BTreeMap, HashMap};
15
16use anyhow::anyhow;
17use opendal::Operator;
18use opendal::layers::{LoggingLayer, RetryLayer};
19use opendal::services::Gcs;
20use serde::Deserialize;
21use serde_with::serde_as;
22use with_options::WithOptions;
23
24use super::opendal_sink::{BatchingStrategy, FileSink};
25use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
26use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
27use crate::source::UnknownFields;
28
29#[derive(Deserialize, Debug, Clone, WithOptions)]
30pub struct GcsCommon {
31    #[serde(rename = "gcs.bucket_name")]
32    pub bucket_name: String,
33
34    /// The base64 encoded credential key. If not set, ADC will be used.
35    #[serde(rename = "gcs.credential")]
36    pub credential: String,
37
38    /// If credential/ADC is not set. The service account can be used to provide the credential info.
39    #[serde(rename = "gcs.service_account", default)]
40    pub service_account: String,
41
42    /// The directory where the sink file is located
43    #[serde(rename = "gcs.path")]
44    pub path: String,
45}
46
47#[serde_as]
48#[derive(Clone, Debug, Deserialize, WithOptions)]
49pub struct GcsConfig {
50    #[serde(flatten)]
51    pub common: GcsCommon,
52
53    #[serde(flatten)]
54    pub batching_strategy: BatchingStrategy,
55
56    pub r#type: String, // accept "append-only"
57
58    #[serde(flatten)]
59    pub unknown_fields: HashMap<String, String>,
60}
61
62impl UnknownFields for GcsConfig {
63    fn unknown_fields(&self) -> HashMap<String, String> {
64        self.unknown_fields.clone()
65    }
66}
67
68pub const GCS_SINK: &str = "gcs";
69
70impl<S: OpendalSinkBackend> FileSink<S> {
71    pub fn new_gcs_sink(config: GcsConfig) -> Result<Operator> {
72        // Create gcs builder.
73        let builder = Gcs::default()
74            .bucket(&config.common.bucket_name)
75            .credential(&config.common.credential)
76            .service_account(&config.common.service_account);
77
78        let operator: Operator = Operator::new(builder)?
79            .layer(LoggingLayer::default())
80            .layer(RetryLayer::default())
81            .finish();
82        Ok(operator)
83    }
84}
85
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub struct GcsSink;
88
89impl OpendalSinkBackend for GcsSink {
90    type Properties = GcsConfig;
91
92    const SINK_NAME: &'static str = GCS_SINK;
93
94    fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
95        let config = serde_json::from_value::<GcsConfig>(serde_json::to_value(btree_map).unwrap())
96            .map_err(|e| SinkError::Config(anyhow!(e)))?;
97        if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
98            return Err(SinkError::Config(anyhow!(
99                "`{}` must be {}, or {}",
100                SINK_TYPE_OPTION,
101                SINK_TYPE_APPEND_ONLY,
102                SINK_TYPE_UPSERT
103            )));
104        }
105        Ok(config)
106    }
107
108    fn new_operator(properties: GcsConfig) -> Result<Operator> {
109        FileSink::<GcsSink>::new_gcs_sink(properties)
110    }
111
112    fn get_path(properties: Self::Properties) -> String {
113        properties.common.path
114    }
115
116    fn get_engine_type() -> super::opendal_sink::EngineType {
117        super::opendal_sink::EngineType::Gcs
118    }
119
120    fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
121        BatchingStrategy {
122            max_row_count: properties.batching_strategy.max_row_count,
123            rollover_seconds: properties.batching_strategy.rollover_seconds,
124            path_partition_prefix: properties.batching_strategy.path_partition_prefix,
125        }
126    }
127}