risingwave_connector/
with_options.rs1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::secret::PbSecretRef;
20
21use crate::error::ConnectorResult;
22use crate::sink::catalog::SinkFormatDesc;
23use crate::source::cdc::MYSQL_CDC_CONNECTOR;
24use crate::source::cdc::external::CdcTableType;
25use crate::source::iceberg::ICEBERG_CONNECTOR;
26use crate::source::{
27 AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR,
28 POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
29};
30
31pub trait WithOptions {
38 #[doc(hidden)]
39 #[inline(always)]
40 fn assert_receiver_is_with_options(&self) {}
41}
42
43impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
50 for crate::source::cdc::CdcProperties<T>
51{
52}
53
54impl<T: WithOptions> WithOptions for Option<T> {}
57impl WithOptions for Vec<String> {}
58impl WithOptions for Vec<u64> {}
59impl WithOptions for HashMap<String, String> {}
60impl WithOptions for BTreeMap<String, String> {}
61
62impl WithOptions for String {}
63impl WithOptions for bool {}
64impl WithOptions for usize {}
65impl WithOptions for u8 {}
66impl WithOptions for u16 {}
67impl WithOptions for u32 {}
68impl WithOptions for u64 {}
69impl WithOptions for i32 {}
70impl WithOptions for i64 {}
71impl WithOptions for f64 {}
72impl WithOptions for std::time::Duration {}
73impl WithOptions for crate::connector_common::MqttQualityOfService {}
74impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
75impl WithOptions for crate::sink::kafka::CompressionCodec {}
76impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
77impl WithOptions for nexmark::config::RateShape {}
78impl WithOptions for nexmark::event::EventType {}
79impl<T> WithOptions for PhantomData<T> {}
80
81pub trait Get {
82 fn get(&self, key: &str) -> Option<&String>;
83}
84
85pub trait GetKeyIter {
86 fn key_iter(&self) -> impl Iterator<Item = &str>;
87}
88
89impl GetKeyIter for HashMap<String, String> {
90 fn key_iter(&self) -> impl Iterator<Item = &str> {
91 self.keys().map(|s| s.as_str())
92 }
93}
94
95impl Get for HashMap<String, String> {
96 fn get(&self, key: &str) -> Option<&String> {
97 self.get(key)
98 }
99}
100
101impl Get for BTreeMap<String, String> {
102 fn get(&self, key: &str) -> Option<&String> {
103 self.get(key)
104 }
105}
106
107impl GetKeyIter for BTreeMap<String, String> {
108 fn key_iter(&self) -> impl Iterator<Item = &str> {
109 self.keys().map(|s| s.as_str())
110 }
111}
112
113pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
115 #[inline(always)]
116 fn get_connector(&self) -> Option<String> {
117 self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
118 }
119
120 #[inline(always)]
121 fn is_kafka_connector(&self) -> bool {
122 let Some(connector) = self.get_connector() else {
123 return false;
124 };
125 connector == KAFKA_CONNECTOR
126 }
127
128 #[inline(always)]
129 fn is_mysql_cdc_connector(&self) -> bool {
130 let Some(connector) = self.get_connector() else {
131 return false;
132 };
133 connector == MYSQL_CDC_CONNECTOR
134 }
135
136 #[inline(always)]
137 fn get_sync_call_timeout(&self) -> Option<Duration> {
138 const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; self.get(SYNC_CALL_TIMEOUT_KEY)
140 .and_then(|s| duration_str::parse_std(s).ok())
142 }
143
144 #[inline(always)]
145 fn is_cdc_connector(&self) -> bool {
146 let Some(connector) = self.get_connector() else {
147 return false;
148 };
149 connector.contains("-cdc")
150 }
151
152 fn is_shareable_cdc_connector(&self) -> bool {
154 self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
155 }
156
157 fn is_shareable_only_cdc_connector(&self) -> bool {
160 self.is_cdc_connector() && CdcTableType::from_properties(self).shareable_only()
161 }
162
163 fn enable_transaction_metadata(&self) -> bool {
164 CdcTableType::from_properties(self).enable_transaction_metadata()
165 }
166
167 fn is_shareable_non_cdc_connector(&self) -> bool {
168 self.is_kafka_connector()
169 }
170
171 #[inline(always)]
172 fn is_iceberg_connector(&self) -> bool {
173 let Some(connector) = self.get_connector() else {
174 return false;
175 };
176 connector == ICEBERG_CONNECTOR
177 }
178
179 fn connector_need_pk(&self) -> bool {
180 !self.is_iceberg_connector()
186 }
187
188 fn is_legacy_fs_connector(&self) -> bool {
189 self.get(UPSTREAM_SOURCE_KEY)
190 .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
191 .unwrap_or(false)
192 }
193
194 fn is_new_fs_connector(&self) -> bool {
195 self.get(UPSTREAM_SOURCE_KEY)
196 .map(|s| {
197 s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
198 || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
199 || s.eq_ignore_ascii_case(GCS_CONNECTOR)
200 || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
201 })
202 .unwrap_or(false)
203 }
204
205 fn is_batch_connector(&self) -> bool {
207 false
208 }
213
214 fn requires_singleton(&self) -> bool {
215 self.is_new_fs_connector() || self.is_iceberg_connector() || self.is_batch_connector()
216 }
217}
218
219impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
220
221#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
223pub struct WithOptionsSecResolved {
224 inner: BTreeMap<String, String>,
225 secret_ref: BTreeMap<String, PbSecretRef>,
226}
227
228impl std::ops::Deref for WithOptionsSecResolved {
229 type Target = BTreeMap<String, String>;
230
231 fn deref(&self) -> &Self::Target {
232 &self.inner
233 }
234}
235
236impl std::ops::DerefMut for WithOptionsSecResolved {
237 fn deref_mut(&mut self) -> &mut Self::Target {
238 &mut self.inner
239 }
240}
241
242impl WithOptionsSecResolved {
243 pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
245 Self { inner, secret_ref }
246 }
247
248 pub fn as_plaintext(&self) -> &BTreeMap<String, String> {
249 &self.inner
250 }
251
252 pub fn as_secret(&self) -> &BTreeMap<String, PbSecretRef> {
253 &self.secret_ref
254 }
255
256 pub fn handle_update(
257 &mut self,
258 update_alter_props: BTreeMap<String, String>,
259 update_alter_secret_refs: BTreeMap<String, PbSecretRef>,
260 ) -> ConnectorResult<(Vec<u32>, Vec<u32>)> {
261 let to_add_secret_dep = update_alter_secret_refs
262 .values()
263 .map(|new_rely_secret| new_rely_secret.secret_id)
264 .collect();
265 let mut to_remove_secret_dep: Vec<u32> = vec![];
266
267 for key in update_alter_props.keys() {
269 if update_alter_secret_refs.contains_key(key) {
270 return Err(
271 anyhow::anyhow!("the key {} is set both in plaintext and secret", key).into(),
272 );
273 }
274 }
275
276 for k in update_alter_props.keys() {
278 if let Some(removed_secret) = self.secret_ref.remove(k) {
279 to_remove_secret_dep.push(removed_secret.secret_id);
280 }
281 }
282 for (k, v) in &update_alter_secret_refs {
283 self.inner.remove(k);
284
285 if let Some(old_secret_ref) = self.secret_ref.get(k) {
286 if old_secret_ref.secret_id != v.secret_id {
288 to_remove_secret_dep.push(old_secret_ref.secret_id);
289 } else {
290 continue;
292 }
293 }
294 }
295
296 self.inner.extend(update_alter_props);
297 self.secret_ref.extend(update_alter_secret_refs);
298
299 Ok((to_add_secret_dep, to_remove_secret_dep))
300 }
301
302 pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
304 Self {
305 inner,
306 secret_ref: Default::default(),
307 }
308 }
309
310 pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
312 (self.inner, self.secret_ref)
313 }
314
315 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
316 if let Some(inner_val) = self.inner.get(key)
317 && inner_val.eq_ignore_ascii_case(val)
318 {
319 return true;
320 }
321 false
322 }
323}
324
325impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
327 type Error = crate::sink::SinkError;
328
329 fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
330 let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
331 let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
332 match (connector, r#type) {
333 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
334 _ => Ok(None),
335 }
336 }
337}
338
339impl Get for WithOptionsSecResolved {
340 fn get(&self, key: &str) -> Option<&String> {
341 self.inner.get(key)
342 }
343}
344
345impl GetKeyIter for WithOptionsSecResolved {
346 fn key_iter(&self) -> impl Iterator<Item = &str> {
347 self.inner.keys().map(|s| s.as_str())
348 }
349}