risingwave_connector/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::secret::PbSecretRef;
20
21use crate::sink::catalog::SinkFormatDesc;
22use crate::source::cdc::MYSQL_CDC_CONNECTOR;
23use crate::source::cdc::external::CdcTableType;
24use crate::source::iceberg::ICEBERG_CONNECTOR;
25use crate::source::{
26    AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR,
27    POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
28};
29
30/// Marker trait for `WITH` options. Only for `#[derive(WithOptions)]`, should not be used manually.
31///
32/// This is used to ensure the `WITH` options types have reasonable structure.
33///
34/// TODO: add this bound for sink. There's a `SourceProperties` trait for sources, but no similar
35/// things for sinks.
36pub trait WithOptions {
37    #[doc(hidden)]
38    #[inline(always)]
39    fn assert_receiver_is_with_options(&self) {}
40}
41
42// Currently CDC properties are handled specially.
43// - It simply passes HashMap to Java DBZ.
44// - It's not handled by serde.
45// - It contains fields other than WITH options.
46// TODO: remove the workaround here. And also use #[derive] for it.
47
48impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
49    for crate::source::cdc::CdcProperties<T>
50{
51}
52
53// impl the trait for value types
54
55impl<T: WithOptions> WithOptions for Option<T> {}
56impl WithOptions for Vec<String> {}
57impl WithOptions for Vec<u64> {}
58impl WithOptions for HashMap<String, String> {}
59impl WithOptions for BTreeMap<String, String> {}
60
61impl WithOptions for String {}
62impl WithOptions for bool {}
63impl WithOptions for usize {}
64impl WithOptions for u8 {}
65impl WithOptions for u16 {}
66impl WithOptions for u32 {}
67impl WithOptions for u64 {}
68impl WithOptions for i32 {}
69impl WithOptions for i64 {}
70impl WithOptions for f64 {}
71impl WithOptions for std::time::Duration {}
72impl WithOptions for crate::connector_common::MqttQualityOfService {}
73impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
74impl WithOptions for crate::sink::kafka::CompressionCodec {}
75impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
76impl WithOptions for nexmark::config::RateShape {}
77impl WithOptions for nexmark::event::EventType {}
78impl<T> WithOptions for PhantomData<T> {}
79
80pub trait Get {
81    fn get(&self, key: &str) -> Option<&String>;
82}
83
84pub trait GetKeyIter {
85    fn key_iter(&self) -> impl Iterator<Item = &str>;
86}
87
88impl GetKeyIter for HashMap<String, String> {
89    fn key_iter(&self) -> impl Iterator<Item = &str> {
90        self.keys().map(|s| s.as_str())
91    }
92}
93
94impl Get for HashMap<String, String> {
95    fn get(&self, key: &str) -> Option<&String> {
96        self.get(key)
97    }
98}
99
100impl Get for BTreeMap<String, String> {
101    fn get(&self, key: &str) -> Option<&String> {
102        self.get(key)
103    }
104}
105
106impl GetKeyIter for BTreeMap<String, String> {
107    fn key_iter(&self) -> impl Iterator<Item = &str> {
108        self.keys().map(|s| s.as_str())
109    }
110}
111
112/// Utility methods for `WITH` properties (`HashMap` and `BTreeMap`).
113pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
114    #[inline(always)]
115    fn get_connector(&self) -> Option<String> {
116        self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
117    }
118
119    #[inline(always)]
120    fn is_kafka_connector(&self) -> bool {
121        let Some(connector) = self.get_connector() else {
122            return false;
123        };
124        connector == KAFKA_CONNECTOR
125    }
126
127    #[inline(always)]
128    fn is_mysql_cdc_connector(&self) -> bool {
129        let Some(connector) = self.get_connector() else {
130            return false;
131        };
132        connector == MYSQL_CDC_CONNECTOR
133    }
134
135    #[inline(always)]
136    fn get_sync_call_timeout(&self) -> Option<Duration> {
137        const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; // only from kafka props, add more if needed
138        self.get(SYNC_CALL_TIMEOUT_KEY)
139            // ignore the error is ok here, because we will parse the field again when building the properties and has more precise error message
140            .and_then(|s| duration_str::parse_std(s).ok())
141    }
142
143    #[inline(always)]
144    fn is_cdc_connector(&self) -> bool {
145        let Some(connector) = self.get_connector() else {
146            return false;
147        };
148        connector.contains("-cdc")
149    }
150
151    /// It is shared when `CREATE SOURCE`, and not shared when `CREATE TABLE`. So called "shareable".
152    fn is_shareable_cdc_connector(&self) -> bool {
153        self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
154    }
155
156    /// Tables with MySQL and PostgreSQL connectors are maintained for backward compatibility.
157    /// The newly added SQL Server CDC connector is only supported when created as shared.
158    fn is_shareable_only_cdc_connector(&self) -> bool {
159        self.is_cdc_connector() && CdcTableType::from_properties(self).shareable_only()
160    }
161
162    fn enable_transaction_metadata(&self) -> bool {
163        CdcTableType::from_properties(self).enable_transaction_metadata()
164    }
165
166    fn is_shareable_non_cdc_connector(&self) -> bool {
167        self.is_kafka_connector()
168    }
169
170    #[inline(always)]
171    fn is_iceberg_connector(&self) -> bool {
172        let Some(connector) = self.get_connector() else {
173            return false;
174        };
175        connector == ICEBERG_CONNECTOR
176    }
177
178    fn connector_need_pk(&self) -> bool {
179        // Currently only iceberg connector doesn't need primary key
180        // introduced in https://github.com/risingwavelabs/risingwave/pull/14971
181        // XXX: This seems not the correct way. Iceberg doesn't necessarily lack a PK.
182        // "batch source" doesn't need a PK?
183        // For streaming, if it has a PK, do we want to use it? It seems not safe.
184        !self.is_iceberg_connector()
185    }
186
187    fn is_legacy_fs_connector(&self) -> bool {
188        self.get(UPSTREAM_SOURCE_KEY)
189            .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
190            .unwrap_or(false)
191    }
192
193    fn is_new_fs_connector(&self) -> bool {
194        self.get(UPSTREAM_SOURCE_KEY)
195            .map(|s| {
196                s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
197                    || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
198                    || s.eq_ignore_ascii_case(GCS_CONNECTOR)
199                    || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
200            })
201            .unwrap_or(false)
202    }
203}
204
205impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
206
207/// Options or properties extracted from the `WITH` clause of DDLs.
208#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
209pub struct WithOptionsSecResolved {
210    inner: BTreeMap<String, String>,
211    secret_ref: BTreeMap<String, PbSecretRef>,
212}
213
214impl std::ops::Deref for WithOptionsSecResolved {
215    type Target = BTreeMap<String, String>;
216
217    fn deref(&self) -> &Self::Target {
218        &self.inner
219    }
220}
221
222impl std::ops::DerefMut for WithOptionsSecResolved {
223    fn deref_mut(&mut self) -> &mut Self::Target {
224        &mut self.inner
225    }
226}
227
228impl WithOptionsSecResolved {
229    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and resolved secret ref.
230    pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
231        Self { inner, secret_ref }
232    }
233
234    /// Create a new [`WithOptions`] from a [`BTreeMap`].
235    pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
236        Self {
237            inner,
238            secret_ref: Default::default(),
239        }
240    }
241
242    /// Take the value of the option map and secret refs.
243    pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
244        (self.inner, self.secret_ref)
245    }
246
247    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
248        if let Some(inner_val) = self.inner.get(key) {
249            if inner_val.eq_ignore_ascii_case(val) {
250                return true;
251            }
252        }
253        false
254    }
255}
256
257/// For `planner_test` crate so that it does not depend directly on `connector` crate just for `SinkFormatDesc`.
258impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
259    type Error = crate::sink::SinkError;
260
261    fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
262        let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
263        let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
264        match (connector, r#type) {
265            (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
266            _ => Ok(None),
267        }
268    }
269}
270
271impl Get for WithOptionsSecResolved {
272    fn get(&self, key: &str) -> Option<&String> {
273        self.inner.get(key)
274    }
275}
276
277impl GetKeyIter for WithOptionsSecResolved {
278    fn key_iter(&self) -> impl Iterator<Item = &str> {
279        self.inner.keys().map(|s| s.as_str())
280    }
281}