risingwave_connector/sink/file_sink/
azblob.rs1use std::collections::{BTreeMap, HashMap};
16
17use anyhow::anyhow;
18use opendal::Operator;
19use opendal::layers::{LoggingLayer, RetryLayer};
20use opendal::services::Azblob;
21use serde::Deserialize;
22use serde_with::serde_as;
23use with_options::WithOptions;
24
25use super::opendal_sink::{BatchingStrategy, FileSink};
26use crate::sink::file_sink::opendal_sink::OpendalSinkBackend;
27use crate::sink::{Result, SINK_TYPE_APPEND_ONLY, SINK_TYPE_OPTION, SINK_TYPE_UPSERT, SinkError};
28use crate::source::UnknownFields;
29#[derive(Deserialize, Debug, Clone, WithOptions)]
30pub struct AzblobCommon {
31 #[serde(rename = "azblob.container_name")]
32 pub container_name: String,
33 #[serde(rename = "azblob.path")]
35 pub path: String,
36 #[serde(rename = "azblob.credentials.account_name", default)]
37 pub account_name: Option<String>,
38 #[serde(rename = "azblob.credentials.account_key", default)]
39 pub account_key: Option<String>,
40 #[serde(rename = "azblob.endpoint_url")]
41 pub endpoint_url: String,
42}
43
44#[serde_as]
45#[derive(Clone, Debug, Deserialize, WithOptions)]
46pub struct AzblobConfig {
47 #[serde(flatten)]
48 pub common: AzblobCommon,
49
50 #[serde(flatten)]
51 pub batching_strategy: BatchingStrategy,
52
53 pub r#type: String, #[serde(flatten)]
56 pub unknown_fields: HashMap<String, String>,
57}
58
59pub const AZBLOB_SINK: &str = "azblob";
60
61impl<S: OpendalSinkBackend> FileSink<S> {
62 pub fn new_azblob_sink(config: AzblobConfig) -> Result<Operator> {
63 let mut builder = Azblob::default();
65 builder = builder
66 .container(&config.common.container_name)
67 .endpoint(&config.common.endpoint_url);
68
69 if let Some(account_name) = config.common.account_name {
70 builder = builder.account_name(&account_name);
71 } else {
72 tracing::warn!(
73 "account_name azblob is not set, container {}",
74 config.common.container_name
75 );
76 }
77
78 if let Some(account_key) = config.common.account_key {
79 builder = builder.account_key(&account_key);
80 } else {
81 tracing::warn!(
82 "account_key azblob is not set, container {}",
83 config.common.container_name
84 );
85 }
86 let operator: Operator = Operator::new(builder)?
87 .layer(LoggingLayer::default())
88 .layer(RetryLayer::default())
89 .finish();
90
91 Ok(operator)
92 }
93}
94
95#[derive(Debug, Clone, Copy, PartialEq, Eq)]
96pub struct AzblobSink;
97
98impl UnknownFields for AzblobConfig {
99 fn unknown_fields(&self) -> HashMap<String, String> {
100 self.unknown_fields.clone()
101 }
102}
103
104impl OpendalSinkBackend for AzblobSink {
105 type Properties = AzblobConfig;
106
107 const SINK_NAME: &'static str = AZBLOB_SINK;
108
109 fn from_btreemap(btree_map: BTreeMap<String, String>) -> Result<Self::Properties> {
110 let config =
111 serde_json::from_value::<AzblobConfig>(serde_json::to_value(btree_map).unwrap())
112 .map_err(|e| SinkError::Config(anyhow!(e)))?;
113 if config.r#type != SINK_TYPE_APPEND_ONLY && config.r#type != SINK_TYPE_UPSERT {
114 return Err(SinkError::Config(anyhow!(
115 "`{}` must be {}, or {}",
116 SINK_TYPE_OPTION,
117 SINK_TYPE_APPEND_ONLY,
118 SINK_TYPE_UPSERT
119 )));
120 }
121 Ok(config)
122 }
123
124 fn new_operator(properties: AzblobConfig) -> Result<Operator> {
125 FileSink::<AzblobSink>::new_azblob_sink(properties)
126 }
127
128 fn get_path(properties: Self::Properties) -> String {
129 properties.common.path
130 }
131
132 fn get_engine_type() -> super::opendal_sink::EngineType {
133 super::opendal_sink::EngineType::Azblob
134 }
135
136 fn get_batching_strategy(properties: Self::Properties) -> BatchingStrategy {
137 BatchingStrategy {
138 max_row_count: properties.batching_strategy.max_row_count,
139 rollover_seconds: properties.batching_strategy.rollover_seconds,
140 path_partition_prefix: properties.batching_strategy.path_partition_prefix,
141 }
142 }
143}