risingwave_connector/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::secret::PbSecretRef;
20
21use crate::error::ConnectorResult;
22use crate::sink::catalog::SinkFormatDesc;
23use crate::source::cdc::MYSQL_CDC_CONNECTOR;
24use crate::source::cdc::external::ExternalCdcTableType;
25use crate::source::iceberg::ICEBERG_CONNECTOR;
26use crate::source::{
27    AZBLOB_CONNECTOR, BATCH_POSIX_FS_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR,
28    LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR, POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
29};
30
31/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
32///
33/// This is used to ensure the `WITH` options types have reasonable structure.
34///
35/// TODO: add this bound for sink. There's a `SourceProperties` trait for sources, but no similar
36/// things for sinks.
37pub trait WithOptions {
38    #[doc(hidden)]
39    #[inline(always)]
40    fn assert_receiver_is_with_options(&self) {}
41}
42
43// Currently CDC properties are handled specially.
44// - It simply passes HashMap to Java DBZ.
45// - It's not handled by serde.
46// - It contains fields other than WITH options.
47// TODO: remove the workaround here. And also use #[derive] for it.
48
49impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
50    for crate::source::cdc::CdcProperties<T>
51{
52}
53
54// impl the trait for value types
55
56impl<T: WithOptions> WithOptions for Option<T> {}
57impl WithOptions for Vec<String> {}
58impl WithOptions for Vec<u64> {}
59impl WithOptions for HashMap<String, String> {}
60impl WithOptions for BTreeMap<String, String> {}
61
62impl WithOptions for String {}
63impl WithOptions for bool {}
64impl WithOptions for usize {}
65impl WithOptions for u8 {}
66impl WithOptions for u16 {}
67impl WithOptions for u32 {}
68impl WithOptions for u64 {}
69impl WithOptions for i32 {}
70impl WithOptions for i64 {}
71impl WithOptions for f64 {}
72impl WithOptions for std::time::Duration {}
73impl WithOptions for crate::connector_common::MqttQualityOfService {}
74impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
75impl WithOptions for crate::sink::kafka::CompressionCodec {}
76impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
77impl WithOptions for nexmark::config::RateShape {}
78impl WithOptions for nexmark::event::EventType {}
79impl<T> WithOptions for PhantomData<T> {}
80
81pub trait Get {
82    fn get(&self, key: &str) -> Option<&String>;
83}
84
85pub trait GetKeyIter {
86    fn key_iter(&self) -> impl Iterator<Item = &str>;
87}
88
89impl GetKeyIter for HashMap<String, String> {
90    fn key_iter(&self) -> impl Iterator<Item = &str> {
91        self.keys().map(|s| s.as_str())
92    }
93}
94
95impl Get for HashMap<String, String> {
96    fn get(&self, key: &str) -> Option<&String> {
97        self.get(key)
98    }
99}
100
101impl Get for BTreeMap<String, String> {
102    fn get(&self, key: &str) -> Option<&String> {
103        self.get(key)
104    }
105}
106
107impl GetKeyIter for BTreeMap<String, String> {
108    fn key_iter(&self) -> impl Iterator<Item = &str> {
109        self.keys().map(|s| s.as_str())
110    }
111}
112
113/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
114pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
115    #[inline(always)]
116    fn get_connector(&self) -> Option<String> {
117        self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
118    }
119
120    #[inline(always)]
121    fn is_kafka_connector(&self) -> bool {
122        let Some(connector) = self.get_connector() else {
123            return false;
124        };
125        connector == KAFKA_CONNECTOR
126    }
127
128    #[inline(always)]
129    fn is_mysql_cdc_connector(&self) -> bool {
130        let Some(connector) = self.get_connector() else {
131            return false;
132        };
133        connector == MYSQL_CDC_CONNECTOR
134    }
135
136    #[inline(always)]
137    fn get_sync_call_timeout(&self) -> Option<Duration> {
138        const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed
139        self.get(SYNC_CALL_TIMEOUT_KEY)
140            // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message
141            .and_then(|s| duration_str::parse_std(s).ok())
142    }
143
144    #[inline(always)]
145    fn is_cdc_connector(&self) -> bool {
146        let Some(connector) = self.get_connector() else {
147            return false;
148        };
149        connector.contains("-cdc")
150    }
151
152    /// It is shared when `CREATE SOURCE`, and not shared when `CREATE TABLE`. So called "shareable".
153    fn is_shareable_cdc_connector(&self) -> bool {
154        self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).can_backfill()
155    }
156
157    /// Tables with MySQL and PostgreSQL connectors are maintained for backward compatibility.
158    /// The newly added SQL Server CDC connector is only supported when created as shared.
159    fn is_shareable_only_cdc_connector(&self) -> bool {
160        self.is_cdc_connector() && ExternalCdcTableType::from_properties(self).shareable_only()
161    }
162
163    fn enable_transaction_metadata(&self) -> bool {
164        ExternalCdcTableType::from_properties(self).enable_transaction_metadata()
165    }
166
167    fn is_shareable_non_cdc_connector(&self) -> bool {
168        self.is_kafka_connector()
169    }
170
171    #[inline(always)]
172    fn is_iceberg_connector(&self) -> bool {
173        let Some(connector) = self.get_connector() else {
174            return false;
175        };
176        connector == ICEBERG_CONNECTOR
177    }
178
179    fn connector_need_pk(&self) -> bool {
180        // Currently only iceberg connector doesn't need primary key
181        // introduced in https://github.com/risingwavelabs/risingwave/pull/14971
182        // XXX: This seems not the correct way. Iceberg doesn't necessarily lack a PK.
183        // "batch source" doesn't need a PK?
184        // For streaming, if it has a PK, do we want to use it? It seems not safe.
185        !self.is_iceberg_connector()
186    }
187
188    fn is_legacy_fs_connector(&self) -> bool {
189        self.get(UPSTREAM_SOURCE_KEY)
190            .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
191            .unwrap_or(false)
192    }
193
194    fn is_new_fs_connector(&self) -> bool {
195        self.get(UPSTREAM_SOURCE_KEY)
196            .map(|s| {
197                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
198                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
199                    || s.eq_ignore_ascii_case(GCS_CONNECTOR)
200                    || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
201            })
202            .unwrap_or(false)
203    }
204
205    /// See [`crate::source::batch::BatchSourceSplit`] for more details.
206    fn is_batch_connector(&self) -> bool {
207        self.get(UPSTREAM_SOURCE_KEY)
208            .map(|s| s.eq_ignore_ascii_case(BATCH_POSIX_FS_CONNECTOR))
209            .unwrap_or(false)
210    }
211
212    fn requires_singleton(&self) -> bool {
213        self.is_new_fs_connector() || self.is_iceberg_connector() || self.is_batch_connector()
214    }
215}
216
217impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
218
219/// Options or properties extracted from the `WITH` clause of DDLs.
220#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
221pub struct WithOptionsSecResolved {
222    inner: BTreeMap<String, String>,
223    secret_ref: BTreeMap<String, PbSecretRef>,
224}
225
226impl std::ops::Deref for WithOptionsSecResolved {
227    type Target = BTreeMap<String, String>;
228
229    fn deref(&self) -> &Self::Target {
230        &self.inner
231    }
232}
233
234impl std::ops::DerefMut for WithOptionsSecResolved {
235    fn deref_mut(&mut self) -> &mut Self::Target {
236        &mut self.inner
237    }
238}
239
240impl WithOptionsSecResolved {
241    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and resolved secret ref.
242    pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
243        Self { inner, secret_ref }
244    }
245
246    pub fn as_plaintext(&self) -> &BTreeMap<String, String> {
247        &self.inner
248    }
249
250    pub fn as_secret(&self) -> &BTreeMap<String, PbSecretRef> {
251        &self.secret_ref
252    }
253
254    pub fn handle_update(
255        &mut self,
256        update_alter_props: BTreeMap<String, String>,
257        update_alter_secret_refs: BTreeMap<String, PbSecretRef>,
258    ) -> ConnectorResult<(Vec<u32>, Vec<u32>)> {
259        let to_add_secret_dep = update_alter_secret_refs
260            .values()
261            .map(|new_rely_secret| new_rely_secret.secret_id)
262            .collect();
263        let mut to_remove_secret_dep: Vec<u32> = vec![];
264
265        // make sure the key in update_alter_props and update_alter_secret_refs not collide
266        for key in update_alter_props.keys() {
267            if update_alter_secret_refs.contains_key(key) {
268                return Err(
269                    anyhow::anyhow!("the key {} is set both in plaintext and secret", key).into(),
270                );
271            }
272        }
273
274        // remove legacy key if it's set in both plaintext and secret
275        for k in update_alter_props.keys() {
276            if let Some(removed_secret) = self.secret_ref.remove(k) {
277                to_remove_secret_dep.push(removed_secret.secret_id);
278            }
279        }
280        for (k, v) in &update_alter_secret_refs {
281            self.inner.remove(k);
282
283            if let Some(old_secret_ref) = self.secret_ref.get(k) {
284                // no need to remove, do extend later
285                if old_secret_ref.secret_id != v.secret_id {
286                    to_remove_secret_dep.push(old_secret_ref.secret_id);
287                } else {
288                    // If the secret ref is the same, we don't need to update it.
289                    continue;
290                }
291            }
292        }
293
294        self.inner.extend(update_alter_props);
295        self.secret_ref.extend(update_alter_secret_refs);
296
297        Ok((to_add_secret_dep, to_remove_secret_dep))
298    }
299
300    /// Create a new [`WithOptions`] from a [`BTreeMap`].
301    pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
302        Self {
303            inner,
304            secret_ref: Default::default(),
305        }
306    }
307
308    /// Take the value of the option map and secret refs.
309    pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
310        (self.inner, self.secret_ref)
311    }
312
313    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
314        if let Some(inner_val) = self.inner.get(key)
315            && inner_val.eq_ignore_ascii_case(val)
316        {
317            return true;
318        }
319        false
320    }
321}
322
323/// For `planner_test` crate so that it does not depend directly on `connector` crate just for `SinkFormatDesc`.
324impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
325    type Error = crate::sink::SinkError;
326
327    fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
328        let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
329        let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
330        match (connector, r#type) {
331            (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
332            _ => Ok(None),
333        }
334    }
335}
336
337impl Get for WithOptionsSecResolved {
338    fn get(&self, key: &str) -> Option<&String> {
339        self.inner.get(key)
340    }
341}
342
343impl GetKeyIter for WithOptionsSecResolved {
344    fn key_iter(&self) -> impl Iterator<Item = &str> {
345        self.inner.keys().map(|s| s.as_str())
346    }
347}