risingwave_connector/
with_options.rs1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::id::SecretId;
20use risingwave_pb::secret::PbSecretRef;
21
22use crate::error::ConnectorResult;
23use crate::sink::catalog::SinkFormatDesc;
24use crate::source::cdc::MYSQL_CDC_CONNECTOR;
25use crate::source::cdc::external::ExternalCdcTableType;
26use crate::source::iceberg::ICEBERG_CONNECTOR;
27use crate::source::{
28 AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR,
29 LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, PULSAR_CONNECTOR,
30 UPSTREAM_SOURCE_KEY,
31};
32
33pub trait WithOptions {
40 #[doc(hidden)]
41 #[inline(always)]
42 fn assert_receiver_is_with_options(&self) {}
43}
44
45impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
52 for crate::source::cdc::CdcProperties<T>
53{
54}
55
56impl<T: WithOptions> WithOptions for Option<T> {}
59impl WithOptions for Vec<String> {}
60impl WithOptions for Vec<u64> {}
61impl WithOptions for HashMap<String, String> {}
62impl WithOptions for BTreeMap<String, String> {}
63
64impl WithOptions for String {}
65impl WithOptions for bool {}
66impl WithOptions for usize {}
67impl WithOptions for u8 {}
68impl WithOptions for u16 {}
69impl WithOptions for u32 {}
70impl WithOptions for u64 {}
71impl WithOptions for i32 {}
72impl WithOptions for i64 {}
73impl WithOptions for f64 {}
74impl WithOptions for std::time::Duration {}
75impl WithOptions for crate::connector_common::MqttQualityOfService {}
76impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
77impl WithOptions for crate::sink::kafka::CompressionCodec {}
78impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
79impl WithOptions for nexmark::config::RateShape {}
80impl WithOptions for nexmark::event::EventType {}
81impl<T> WithOptions for PhantomData<T> {}
82
83pub trait Get {
84 fn get(&self, key: &str) -> Option<&String>;
85}
86
87pub trait GetKeyIter {
88 fn key_iter(&self) -> impl Iterator<Item = &str>;
89}
90
91impl GetKeyIter for HashMap<String, String> {
92 fn key_iter(&self) -> impl Iterator<Item = &str> {
93 self.keys().map(|s| s.as_str())
94 }
95}
96
97impl Get for HashMap<String, String> {
98 fn get(&self, key: &str) -> Option<&String> {
99 self.get(key)
100 }
101}
102
103impl Get for BTreeMap<String, String> {
104 fn get(&self, key: &str) -> Option<&String> {
105 self.get(key)
106 }
107}
108
109impl GetKeyIter for BTreeMap<String, String> {
110 fn key_iter(&self) -> impl Iterator<Item = &str> {
111 self.keys().map(|s| s.as_str())
112 }
113}
114
115pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
117 #[inline(always)]
118 fn get_connector(&self) -> Option<String> {
119 self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
120 }
121
122 #[inline(always)]
123 fn is_kafka_connector(&self) -> bool {
124 let Some(connector) = self.get_connector() else {
125 return false;
126 };
127 connector == KAFKA_CONNECTOR
128 }
129
130 #[inline(always)]
131 fn is_pulsar_connector(&self) -> bool {
132 let Some(connector) = self.get_connector() else {
133 return false;
134 };
135 connector == PULSAR_CONNECTOR
136 }
137
138 #[inline(always)]
139 fn is_mysql_cdc_connector(&self) -> bool {
140 let Some(connector) = self.get_connector() else {
141 return false;
142 };
143 connector == MYSQL_CDC_CONNECTOR
144 }
145
146 #[inline(always)]
147 fn get_sync_call_timeout(&self) -> Option<Duration> {
148 const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; self.get(SYNC_CALL_TIMEOUT_KEY)
150 .and_then(|s| duration_str::parse_std(s).ok())
152 }
153
154 #[inline(always)]
155 fn is_cdc_connector(&self) -> bool {
156 let Some(connector) = self.get_connector() else {
157 return false;
158 };
159 connector.contains("-cdc")
160 }
161
162 fn is_shareable_cdc_connector(&self) -> bool {
164 self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).can_backfill()
165 }
166
167 fn is_shareable_only_cdc_connector(&self) -> bool {
170 self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).shareable_only()
171 }
172
173 fn enable_transaction_metadata(&self) -> bool {
174 ExternalCdcTableType::from_properties(self).enable_transaction_metadata()
175 }
176
177 fn is_shareable_non_cdc_connector(&self) -> bool {
178 self.is_kafka_connector()
179 }
180
181 #[inline(always)]
182 fn is_iceberg_connector(&self) -> bool {
183 let Some(connector) = self.get_connector() else {
184 return false;
185 };
186 connector == ICEBERG_CONNECTOR
187 }
188
189 fn connector_need_pk(&self) -> bool {
190 !self.is_iceberg_connector()
196 }
197
198 fn is_legacy_fs_connector(&self) -> bool {
199 self.get(UPSTREAM_SOURCE_KEY)
200 .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
201 .unwrap_or(false)
202 }
203
204 fn is_new_fs_connector(&self) -> bool {
205 self.get(UPSTREAM_SOURCE_KEY)
206 .map(|s| {
207 s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
208 || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
209 || s.eq_ignore_ascii_case(GCS_CONNECTOR)
210 || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
211 })
212 .unwrap_or(false)
213 }
214
215 fn is_batch_connector(&self) -> bool {
217 self.get(UPSTREAM_SOURCE_KEY)
218 .map(|s| s.eq_ignore_ascii_case(BATCH_POSIX_FS_CONNECTOR))
219 .unwrap_or(false)
220 }
221
222 fn requires_singleton(&self) -> bool {
223 self.is_new_fs_connector() || self.is_iceberg_connector() || self.is_batch_connector()
224 }
225}
226
227impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
228
229#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
231pub struct WithOptionsSecResolved {
232 inner: BTreeMap<String, String>,
233 secret_ref: BTreeMap<String, PbSecretRef>,
234}
235
236impl std::ops::Deref for WithOptionsSecResolved {
237 type Target = BTreeMap<String, String>;
238
239 fn deref(&self) -> &Self::Target {
240 &self.inner
241 }
242}
243
244impl std::ops::DerefMut for WithOptionsSecResolved {
245 fn deref_mut(&mut self) -> &mut Self::Target {
246 &mut self.inner
247 }
248}
249
250impl WithOptionsSecResolved {
251 pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
253 Self { inner, secret_ref }
254 }
255
256 pub fn as_plaintext(&self) -> &BTreeMap<String, String> {
257 &self.inner
258 }
259
260 pub fn as_secret(&self) -> &BTreeMap<String, PbSecretRef> {
261 &self.secret_ref
262 }
263
264 pub fn handle_update(
265 &mut self,
266 update_alter_props: BTreeMap<String, String>,
267 update_alter_secret_refs: BTreeMap<String, PbSecretRef>,
268 ) -> ConnectorResult<(Vec<SecretId>, Vec<SecretId>)> {
269 let to_add_secret_dep = update_alter_secret_refs
270 .values()
271 .map(|new_rely_secret| new_rely_secret.secret_id)
272 .collect();
273 let mut to_remove_secret_dep: Vec<SecretId> = vec![];
274
275 for key in update_alter_props.keys() {
277 if update_alter_secret_refs.contains_key(key) {
278 return Err(
279 anyhow::anyhow!("the key {} is set both in plaintext and secret", key).into(),
280 );
281 }
282 }
283
284 for k in update_alter_props.keys() {
286 if let Some(removed_secret) = self.secret_ref.remove(k) {
287 to_remove_secret_dep.push(removed_secret.secret_id);
288 }
289 }
290 for (k, v) in &update_alter_secret_refs {
291 self.inner.remove(k);
292
293 if let Some(old_secret_ref) = self.secret_ref.get(k) {
294 if old_secret_ref.secret_id != v.secret_id {
296 to_remove_secret_dep.push(old_secret_ref.secret_id);
297 } else {
298 continue;
300 }
301 }
302 }
303
304 self.inner.extend(update_alter_props);
305 self.secret_ref.extend(update_alter_secret_refs);
306
307 Ok((to_add_secret_dep, to_remove_secret_dep))
308 }
309
310 pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
312 Self {
313 inner,
314 secret_ref: Default::default(),
315 }
316 }
317
318 pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
320 (self.inner, self.secret_ref)
321 }
322
323 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
324 if let Some(inner_val) = self.inner.get(key)
325 && inner_val.eq_ignore_ascii_case(val)
326 {
327 return true;
328 }
329 false
330 }
331}
332
333impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
335 type Error = crate::sink::SinkError;
336
337 fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
338 let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
339 let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
340 match (connector, r#type) {
341 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
342 _ => Ok(None),
343 }
344 }
345}
346
347impl Get for WithOptionsSecResolved {
348 fn get(&self, key: &str) -> Option<&String> {
349 self.inner.get(key)
350 }
351}
352
353impl GetKeyIter for WithOptionsSecResolved {
354 fn key_iter(&self) -> impl Iterator<Item = &str> {
355 self.inner.keys().map(|s| s.as_str())
356 }
357}