risingwave_connector/
with_options.rs1use std::collections::{BTreeMap, HashMap};
16use std::marker::PhantomData;
17use std::time::Duration;
18
19use risingwave_pb::secret::PbSecretRef;
20
21use crate::sink::catalog::SinkFormatDesc;
22use crate::source::cdc::MYSQL_CDC_CONNECTOR;
23use crate::source::cdc::external::CdcTableType;
24use crate::source::iceberg::ICEBERG_CONNECTOR;
25use crate::source::{
26 AZBLOB_CONNECTOR, GCS_CONNECTOR, KAFKA_CONNECTOR, LEGACY_S3_CONNECTOR, OPENDAL_S3_CONNECTOR,
27 POSIX_FS_CONNECTOR, UPSTREAM_SOURCE_KEY,
28};
29
30pub trait WithOptions {
37 #[doc(hidden)]
38 #[inline(always)]
39 fn assert_receiver_is_with_options(&self) {}
40}
41
42impl<T: crate::source::cdc::CdcSourceTypeTrait> WithOptions
49 for crate::source::cdc::CdcProperties<T>
50{
51}
52
53impl<T: WithOptions> WithOptions for Option<T> {}
56impl WithOptions for Vec<String> {}
57impl WithOptions for Vec<u64> {}
58impl WithOptions for HashMap<String, String> {}
59impl WithOptions for BTreeMap<String, String> {}
60
61impl WithOptions for String {}
62impl WithOptions for bool {}
63impl WithOptions for usize {}
64impl WithOptions for u8 {}
65impl WithOptions for u16 {}
66impl WithOptions for u32 {}
67impl WithOptions for u64 {}
68impl WithOptions for i32 {}
69impl WithOptions for i64 {}
70impl WithOptions for f64 {}
71impl WithOptions for std::time::Duration {}
72impl WithOptions for crate::connector_common::MqttQualityOfService {}
73impl WithOptions for crate::sink::file_sink::opendal_sink::PathPartitionPrefix {}
74impl WithOptions for crate::sink::kafka::CompressionCodec {}
75impl WithOptions for crate::source::filesystem::file_common::CompressionFormat {}
76impl WithOptions for nexmark::config::RateShape {}
77impl WithOptions for nexmark::event::EventType {}
78impl<T> WithOptions for PhantomData<T> {}
79
80pub trait Get {
81 fn get(&self, key: &str) -> Option<&String>;
82}
83
84pub trait GetKeyIter {
85 fn key_iter(&self) -> impl Iterator<Item = &str>;
86}
87
88impl GetKeyIter for HashMap<String, String> {
89 fn key_iter(&self) -> impl Iterator<Item = &str> {
90 self.keys().map(|s| s.as_str())
91 }
92}
93
94impl Get for HashMap<String, String> {
95 fn get(&self, key: &str) -> Option<&String> {
96 self.get(key)
97 }
98}
99
100impl Get for BTreeMap<String, String> {
101 fn get(&self, key: &str) -> Option<&String> {
102 self.get(key)
103 }
104}
105
106impl GetKeyIter for BTreeMap<String, String> {
107 fn key_iter(&self) -> impl Iterator<Item = &str> {
108 self.keys().map(|s| s.as_str())
109 }
110}
111
112pub trait WithPropertiesExt: Get + GetKeyIter + Sized {
114 #[inline(always)]
115 fn get_connector(&self) -> Option<String> {
116 self.get(UPSTREAM_SOURCE_KEY).map(|s| s.to_lowercase())
117 }
118
119 #[inline(always)]
120 fn is_kafka_connector(&self) -> bool {
121 let Some(connector) = self.get_connector() else {
122 return false;
123 };
124 connector == KAFKA_CONNECTOR
125 }
126
127 #[inline(always)]
128 fn is_mysql_cdc_connector(&self) -> bool {
129 let Some(connector) = self.get_connector() else {
130 return false;
131 };
132 connector == MYSQL_CDC_CONNECTOR
133 }
134
135 #[inline(always)]
136 fn get_sync_call_timeout(&self) -> Option<Duration> {
137 const SYNC_CALL_TIMEOUT_KEY: &str = "properties.sync.call.timeout"; self.get(SYNC_CALL_TIMEOUT_KEY)
139 .and_then(|s| duration_str::parse_std(s).ok())
141 }
142
143 #[inline(always)]
144 fn is_cdc_connector(&self) -> bool {
145 let Some(connector) = self.get_connector() else {
146 return false;
147 };
148 connector.contains("-cdc")
149 }
150
151 fn is_shareable_cdc_connector(&self) -> bool {
153 self.is_cdc_connector() && CdcTableType::from_properties(self).can_backfill()
154 }
155
156 fn is_shareable_only_cdc_connector(&self) -> bool {
159 self.is_cdc_connector() && CdcTableType::from_properties(self).shareable_only()
160 }
161
162 fn enable_transaction_metadata(&self) -> bool {
163 CdcTableType::from_properties(self).enable_transaction_metadata()
164 }
165
166 fn is_shareable_non_cdc_connector(&self) -> bool {
167 self.is_kafka_connector()
168 }
169
170 #[inline(always)]
171 fn is_iceberg_connector(&self) -> bool {
172 let Some(connector) = self.get_connector() else {
173 return false;
174 };
175 connector == ICEBERG_CONNECTOR
176 }
177
178 fn connector_need_pk(&self) -> bool {
179 !self.is_iceberg_connector()
185 }
186
187 fn is_legacy_fs_connector(&self) -> bool {
188 self.get(UPSTREAM_SOURCE_KEY)
189 .map(|s| s.eq_ignore_ascii_case(LEGACY_S3_CONNECTOR))
190 .unwrap_or(false)
191 }
192
193 fn is_new_fs_connector(&self) -> bool {
194 self.get(UPSTREAM_SOURCE_KEY)
195 .map(|s| {
196 s.eq_ignore_ascii_case(OPENDAL_S3_CONNECTOR)
197 || s.eq_ignore_ascii_case(POSIX_FS_CONNECTOR)
198 || s.eq_ignore_ascii_case(GCS_CONNECTOR)
199 || s.eq_ignore_ascii_case(AZBLOB_CONNECTOR)
200 })
201 .unwrap_or(false)
202 }
203}
204
205impl<T: Get + GetKeyIter> WithPropertiesExt for T {}
206
207#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
209pub struct WithOptionsSecResolved {
210 inner: BTreeMap<String, String>,
211 secret_ref: BTreeMap<String, PbSecretRef>,
212}
213
214impl std::ops::Deref for WithOptionsSecResolved {
215 type Target = BTreeMap<String, String>;
216
217 fn deref(&self) -> &Self::Target {
218 &self.inner
219 }
220}
221
222impl std::ops::DerefMut for WithOptionsSecResolved {
223 fn deref_mut(&mut self) -> &mut Self::Target {
224 &mut self.inner
225 }
226}
227
228impl WithOptionsSecResolved {
229 pub fn new(inner: BTreeMap<String, String>, secret_ref: BTreeMap<String, PbSecretRef>) -> Self {
231 Self { inner, secret_ref }
232 }
233
234 pub fn without_secrets(inner: BTreeMap<String, String>) -> Self {
236 Self {
237 inner,
238 secret_ref: Default::default(),
239 }
240 }
241
242 pub fn into_parts(self) -> (BTreeMap<String, String>, BTreeMap<String, PbSecretRef>) {
244 (self.inner, self.secret_ref)
245 }
246
247 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
248 if let Some(inner_val) = self.inner.get(key) {
249 if inner_val.eq_ignore_ascii_case(val) {
250 return true;
251 }
252 }
253 false
254 }
255}
256
257impl TryFrom<&WithOptionsSecResolved> for Option<SinkFormatDesc> {
259 type Error = crate::sink::SinkError;
260
261 fn try_from(value: &WithOptionsSecResolved) -> std::result::Result<Self, Self::Error> {
262 let connector = value.get(crate::sink::CONNECTOR_TYPE_KEY);
263 let r#type = value.get(crate::sink::SINK_TYPE_OPTION);
264 match (connector, r#type) {
265 (Some(c), Some(t)) => SinkFormatDesc::from_legacy_type(c, t),
266 _ => Ok(None),
267 }
268 }
269}
270
271impl Get for WithOptionsSecResolved {
272 fn get(&self, key: &str) -> Option<&String> {
273 self.inner.get(key)
274 }
275}
276
277impl GetKeyIter for WithOptionsSecResolved {
278 fn key_iter(&self) -> impl Iterator<Item = &str> {
279 self.inner.keys().map(|s| s.as_str())
280 }
281}