risingwave_connector/
with_options.rs

1// Copyright 2023 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::id::SecretId;
20use risingwave_pb::secret::PbSecretRef;
21
22use crate::error::ConnectorResult;
23use crate::sink::catalog::SinkFormatDesc;
24use crate::source::cdc::MYSQL_CDC_CONNECTOR;
25use crate::source::cdc::external::ExternalCdcTableType;
26use crate::source::iceberg::ICEBERG_CONNECTOR;
27use crate::source::{
28    ADBC_SNOWFLAKE_CONNECTOR, AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR,
29    KAFKA_CONNECTOR, LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR,
30    PULSAR_CONNECTOR, UPSTREAM_SOURCE_KEY,
31};
32
33/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
34///
35/// This is used to ensure the `WITH` options types have reasonable structure.
36///
37/// TODO: add this bound for sink. There's a `SourceProperties` trait for sources, but no similar
38/// things for sinks.
39pub trait WithOptions {
40    #[doc(hidden)]
41    #[inline(always)]
42    fn assert_receiver_is_with_options(&self) {}
43}
44
45// Currently CDC properties are handled specially.
46// - It simply passes HashMap to Java DBZ.
47// - It's not handled by serde.
48// - It contains fields other than WITH options.
49// TODO: remove the workaround here. And also use #[derive] for it.
50
51impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
52    for crate::source::cdc::CdcProperties<T>
53{
54}
55
56// impl the trait for value types
57
58impl<T: WithOptions> WithOptions for Option<T> {}
59impl WithOptions for Vec<String> {}
60impl WithOptions for Vec<u64> {}
61impl WithOptions for HashMap<String, String> {}
62impl WithOptions for BTreeMap<String, String> {}
63
64impl WithOptions for String {}
65impl WithOptions for bool {}
66impl WithOptions for usize {}
67impl WithOptions for u8 {}
68impl WithOptions for u16 {}
69impl WithOptions for u32 {}
70impl WithOptions for u64 {}
71impl WithOptions for i32 {}
72impl WithOptions for i64 {}
73impl WithOptions for f64 {}
74impl WithOptions for std::time::Duration {}
75impl WithOptions for crate::connector_common::MqttQualityOfService {}
76impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
77impl WithOptions for crate::sink::kafka::CompressionCodec {}
78impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
79impl WithOptions for nexmark::config::RateShape {}
80impl WithOptions for nexmark::event::EventType {}
81impl<T> WithOptions for PhantomData<T> {}
82
83pub trait Get {
84    fn get(&self, key: &str) -> Option<&String>;
85}
86
87pub trait GetKeyIter {
88    fn key_iter(&self) -> impl Iterator<Item = &str>;
89}
90
91impl GetKeyIter for HashMap<String, String> {
92    fn key_iter(&self) -> impl Iterator<Item = &str> {
93        self.keys().map(|s| s.as_str())
94    }
95}
96
97impl Get for HashMap<String, String> {
98    fn get(&self, key: &str) -> Option<&String> {
99        self.get(key)
100    }
101}
102
103impl Get for BTreeMap<String, String> {
104    fn get(&self, key: &str) -> Option<&String> {
105        self.get(key)
106    }
107}
108
109impl GetKeyIter for BTreeMap<String, String> {
110    fn key_iter(&self) -> impl Iterator<Item = &str> {
111        self.keys().map(|s| s.as_str())
112    }
113}
114
115/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
116pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
117    #[inline(always)]
118    fn get_connector(&self) -> Option<String> {
119        self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
120    }
121
122    #[inline(always)]
123    fn is_kafka_connector(&self) -> bool {
124        let Some(connector) = self.get_connector() else {
125            return false;
126        };
127        connector == KAFKA_CONNECTOR
128    }
129
130    #[inline(always)]
131    fn is_pulsar_connector(&self) -> bool {
132        let Some(connector) = self.get_connector() else {
133            return false;
134        };
135        connector == PULSAR_CONNECTOR
136    }
137
138    #[inline(always)]
139    fn is_mysql_cdc_connector(&self) -> bool {
140        let Some(connector) = self.get_connector() else {
141            return false;
142        };
143        connector == MYSQL_CDC_CONNECTOR
144    }
145
146    #[inline(always)]
147    fn get_sync_call_timeout(&self) -> Option<Duration> {
148        const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed
149        self.get(SYNC_CALL_TIMEOUT_KEY)
150            // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message
151            .and_then(|s| duration_str::parse_std(s).ok())
152    }
153
154    #[inline(always)]
155    fn is_cdc_connector(&self) -> bool {
156        let Some(connector) = self.get_connector() else {
157            return false;
158        };
159        connector.contains("-cdc")
160    }
161
162    /// It is shared when `CREATE SOURCE`, and not shared when `CREATE TABLE`. So called "shareable".
163    fn is_shareable_cdc_connector(&self) -> bool {
164        self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).can_backfill()
165    }
166
167    /// Tables with MySQL and PostgreSQL connectors are maintained for backward compatibility.
168    /// The newly added SQL Server CDC connector is only supported when created as shared.
169    fn is_shareable_only_cdc_connector(&self) -> bool {
170        self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).shareable_only()
171    }
172
173    fn enable_transaction_metadata(&self) -> bool {
174        ExternalCdcTableType::from_properties(self).enable_transaction_metadata()
175    }
176
177    fn is_shareable_non_cdc_connector(&self) -> bool {
178        self.is_kafka_connector()
179    }
180
181    #[inline(always)]
182    fn is_iceberg_connector(&self) -> bool {
183        let Some(connector) = self.get_connector() else {
184            return false;
185        };
186        connector == ICEBERG_CONNECTOR
187    }
188
189    fn connector_need_pk(&self) -> bool {
190        // Currently only iceberg connector doesn't need primary key
191        // introduced in https://github.com/risingwavelabs/risingwave/pull/14971
192        // XXX: This seems not the correct way. Iceberg doesn't necessarily lack a PK.
193        // "batch source" doesn't need a PK?
194        // For streaming, if it has a PK, do we want to use it? It seems not safe.
195        !self.is_iceberg_connector()
196    }
197
198    fn is_legacy_fs_connector(&self) -> bool {
199        self.get(UPSTREAM_SOURCE_KEY)
200            .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
201            .unwrap_or(false)
202    }
203
204    fn is_new_fs_connector(&self) -> bool {
205        self.get(UPSTREAM_SOURCE_KEY)
206            .map(|s| {
207                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
208                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
209                    || s.eq_ignore_ascii_case(GCS_CONNECTOR)
210                    || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
211            })
212            .unwrap_or(false)
213    }
214
215    fn is_batch_connector(&self) -> bool {
216        self.get(UPSTREAM_SOURCE_KEY)
217            .map(|s| {
218                s.eq_ignore_ascii_case(BATCH_POSIX_FS_CONNECTOR)
219                    || s.eq_ignore_ascii_case(ADBC_SNOWFLAKE_CONNECTOR)
220            })
221            .unwrap_or(false)
222    }
223
224    fn requires_singleton(&self) -> bool {
225        self.is_new_fs_connector() || self.is_iceberg_connector() || self.is_batch_connector()
226    }
227}
228
229impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
230
231/// Options or properties extracted from the `WITH` clause of DDLs.
232#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
233pub struct WithOptionsSecResolved {
234    inner: BTreeMap<String, String>,
235    secret_ref: BTreeMap<String, PbSecretRef>,
236}
237
238impl std::ops::Deref for WithOptionsSecResolved {
239    type Target = BTreeMap<String, String>;
240
241    fn deref(&self) -> &Self::Target {
242        &self.inner
243    }
244}
245
246impl std::ops::DerefMut for WithOptionsSecResolved {
247    fn deref_mut(&mut self) -> &mut Self::Target {
248        &mut self.inner
249    }
250}
251
252impl WithOptionsSecResolved {
253    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and resolved secret ref.
254    pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
255        Self { inner, secret_ref }
256    }
257
258    pub fn as_plaintext(&self) -> &BTreeMap<String, String> {
259        &self.inner
260    }
261
262    pub fn as_secret(&self) -> &BTreeMap<String, PbSecretRef> {
263        &self.secret_ref
264    }
265
266    pub fn handle_update(
267        &mut self,
268        update_alter_props: BTreeMap<String, String>,
269        update_alter_secret_refs: BTreeMap<String, PbSecretRef>,
270    ) -> ConnectorResult<(Vec<SecretId>, Vec<SecretId>)> {
271        let to_add_secret_dep = update_alter_secret_refs
272            .values()
273            .map(|new_rely_secret| new_rely_secret.secret_id)
274            .collect();
275        let mut to_remove_secret_dep: Vec<SecretId> = vec![];
276
277        // make sure the key in update_alter_props and update_alter_secret_refs not collide
278        for key in update_alter_props.keys() {
279            if update_alter_secret_refs.contains_key(key) {
280                return Err(
281                    anyhow::anyhow!("the key {} is set both in plaintext and secret", key).into(),
282                );
283            }
284        }
285
286        // remove legacy key if it's set in both plaintext and secret
287        for k in update_alter_props.keys() {
288            if let Some(removed_secret) = self.secret_ref.remove(k) {
289                to_remove_secret_dep.push(removed_secret.secret_id);
290            }
291        }
292        for (k, v) in &update_alter_secret_refs {
293            self.inner.remove(k);
294
295            if let Some(old_secret_ref) = self.secret_ref.get(k) {
296                // no need to remove, do extend later
297                if old_secret_ref.secret_id != v.secret_id {
298                    to_remove_secret_dep.push(old_secret_ref.secret_id);
299                } else {
300                    // If the secret ref is the same, we don't need to update it.
301                    continue;
302                }
303            }
304        }
305
306        self.inner.extend(update_alter_props);
307        self.secret_ref.extend(update_alter_secret_refs);
308
309        Ok((to_add_secret_dep, to_remove_secret_dep))
310    }
311
312    /// Create a new [`WithOptions`] from a [`BTreeMap`].
313    pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
314        Self {
315            inner,
316            secret_ref: Default::default(),
317        }
318    }
319
320    /// Take the value of the option map and secret refs.
321    pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
322        (self.inner, self.secret_ref)
323    }
324
325    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
326        if let Some(inner_val) = self.inner.get(key)
327            && inner_val.eq_ignore_ascii_case(val)
328        {
329            return true;
330        }
331        false
332    }
333}
334
335/// For `planner_test` crate so that it does not depend directly on `connector` crate just for `SinkFormatDesc`.
336impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
337    type Error = crate::sink::SinkError;
338
339    fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
340        let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
341        let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
342        match (connector, r#type) {
343            (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
344            _ => Ok(None),
345        }
346    }
347}
348
349impl Get for WithOptionsSecResolved {
350    fn get(&self, key: &str) -> Option<&String> {
351        self.inner.get(key)
352    }
353}
354
355impl GetKeyIter for WithOptionsSecResolved {
356    fn key_iter(&self) -> impl Iterator<Item = &str> {
357        self.inner.keys().map(|s| s.as_str())
358    }
359}