1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::WithPropertiesExt;
21use risingwave_connector::connector_common::{
22 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
23};
24use risingwave_connector::source::kafka::private_link::{
25 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
26};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33 ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
34 CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue,
35 Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46 pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52 inner: BTreeMap<String, String>,
53 secret_ref: BTreeMap<String, SecretRefValue>,
54 connection_ref: BTreeMap<String, ConnectionRefValue>,
55}
56
57impl std::ops::Deref for WithOptions {
58 type Target = BTreeMap<String, String>;
59
60 fn deref(&self) -> &Self::Target {
61 &self.inner
62 }
63}
64
65impl std::ops::DerefMut for WithOptions {
66 fn deref_mut(&mut self) -> &mut Self::Target {
67 &mut self.inner
68 }
69}
70
71impl WithOptions {
72 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
74 Self {
75 inner,
76 secret_ref: Default::default(),
77 connection_ref: Default::default(),
78 }
79 }
80
81 pub fn new(
83 inner: BTreeMap<String, String>,
84 secret_ref: BTreeMap<String, SecretRefValue>,
85 connection_ref: BTreeMap<String, ConnectionRefValue>,
86 ) -> Self {
87 Self {
88 inner,
89 secret_ref,
90 connection_ref,
91 }
92 }
93
94 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
95 &mut self.inner
96 }
97
98 pub fn into_parts(
100 self,
101 ) -> (
102 BTreeMap<String, String>,
103 BTreeMap<String, SecretRefValue>,
104 BTreeMap<String, ConnectionRefValue>,
105 ) {
106 (self.inner, self.secret_ref, self.connection_ref)
107 }
108
109 pub fn into_connector_props(self) -> WithOptions {
111 let inner = self
112 .inner
113 .into_iter()
114 .filter(|(key, _)| {
115 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
116 })
117 .collect();
118
119 Self {
120 inner,
121 secret_ref: self.secret_ref,
122 connection_ref: self.connection_ref,
123 }
124 }
125
126 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
128 self.inner
129 .get(options::RETENTION_SECONDS)
130 .and_then(|s| s.parse().ok())
131 }
132
133 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
135 let inner = keys
136 .into_iter()
137 .filter_map(|k| {
138 self.inner
139 .get_key_value(k.as_ref())
140 .map(|(k, v)| (k.clone(), v.clone()))
141 })
142 .collect();
143
144 Self {
145 inner,
146 secret_ref: self.secret_ref.clone(),
147 connection_ref: self.connection_ref.clone(),
148 }
149 }
150
151 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
152 if let Some(inner_val) = self.inner.get(key) {
153 if inner_val.eq_ignore_ascii_case(val) {
154 return true;
155 }
156 }
157 false
158 }
159
160 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
161 &self.secret_ref
162 }
163
164 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
165 &mut self.secret_ref
166 }
167
168 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
169 &self.connection_ref
170 }
171
172 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
173 &mut self.connection_ref
174 }
175
176 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
177 let WithOptions {
178 inner, secret_ref, ..
179 } = WithOptions::try_from(sql_options)?;
180 if secret_ref.is_empty() {
181 Ok(inner)
182 } else {
183 Err(RwError::from(ErrorCode::InvalidParameterValue(
184 "Secret reference is not allowed in OAuth options".to_owned(),
185 )))
186 }
187 }
188
189 pub fn is_source_connector(&self) -> bool {
190 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
191 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
192 }
193}
194
195pub(crate) fn resolve_connection_ref_and_secret_ref(
196 with_options: WithOptions,
197 session: &SessionImpl,
198 object: TelemetryDatabaseObject,
199) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
200 let connector_name = with_options.get_connector();
201 let db_name: &str = &session.database();
202 let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
203
204 let mut connection_id = None;
205 let mut connection_params = None;
206 for connection_ref in connection_refs.values() {
207 connection_params = {
209 let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
211 db_name,
212 connection_ref.connection_name.clone(),
213 )?;
214 let connection_catalog =
215 session.get_connection_by_name(schema_name, &connection_name)?;
216 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
217 connection_id = Some(connection_catalog.id);
218 Some(params.clone())
219 } else {
220 return Err(RwError::from(ErrorCode::InvalidParameterValue(
221 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
222 )));
223 }
224 };
225
226 report_event(
228 PbTelemetryEventStage::CreateStreamJob,
229 "connection_ref",
230 0,
231 connector_name.clone(),
232 Some(object),
233 {
234 connection_params.as_ref().map(|cp| {
235 jsonbb::json!({
236 "connection_type": cp.connection_type().as_str_name().to_owned()
237 })
238 })
239 },
240 );
241 }
242
243 let mut inner_secret_refs = {
244 let mut resolved_secret_refs = BTreeMap::new();
245 for (key, secret_ref) in secret_refs {
246 let (schema_name, secret_name) =
247 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
248 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
249 let ref_as = match secret_ref.ref_as {
250 SecretRefAsType::Text => PbRefAsType::Text,
251 SecretRefAsType::File => PbRefAsType::File,
252 };
253 let pb_secret_ref = PbSecretRef {
254 secret_id: secret_catalog.id.secret_id(),
255 ref_as: ref_as.into(),
256 };
257 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
258 }
259 resolved_secret_refs
260 };
261
262 let mut connection_type = PbConnectionType::Unspecified;
263 let connection_params_is_none_flag = connection_params.is_none();
264
265 if let Some(connection_params) = connection_params {
266 if let Some(broker_rewrite_map) = connection_params
274 .get_properties()
275 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
276 {
277 if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
278 || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
279 {
280 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
281 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
282 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
283 ))));
284 }
285 }
286
287 connection_type = connection_params.connection_type();
288 for (k, v) in connection_params.properties {
289 if options.insert(k.clone(), v).is_some() {
290 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
291 "Duplicated key in both WITH clause and Connection catalog: {}",
292 k
293 ))));
294 }
295 }
296
297 for (k, v) in connection_params.secret_refs {
298 if inner_secret_refs.insert(k.clone(), v).is_some() {
299 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
300 "Duplicated key in both WITH clause and Connection catalog: {}",
301 k
302 ))));
303 }
304 }
305
306 {
307 if connection_type == PbConnectionType::SchemaRegistry {
309 if options
311 .keys()
312 .chain(inner_secret_refs.keys())
313 .any(|k| k.starts_with("aws"))
314 {
315 return Err(RwError::from(ErrorCode::InvalidParameterValue(
316 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
317 )));
318 }
319 }
320 }
321 }
322
323 if connection_params_is_none_flag {
325 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
326 }
327 Ok((
328 WithOptionsSecResolved::new(options, inner_secret_refs),
329 connection_type,
330 connection_id,
331 ))
332}
333
334pub(crate) fn resolve_secret_ref_in_with_options(
336 with_options: WithOptions,
337 session: &SessionImpl,
338) -> RwResult<WithOptionsSecResolved> {
339 let (options, secret_refs, _) = with_options.into_parts();
340 let mut resolved_secret_refs = BTreeMap::new();
341 let db_name: &str = &session.database();
342 for (key, secret_ref) in secret_refs {
343 let (schema_name, secret_name) =
344 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
345 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
346 let ref_as = match secret_ref.ref_as {
347 SecretRefAsType::Text => PbRefAsType::Text,
348 SecretRefAsType::File => PbRefAsType::File,
349 };
350 let pb_secret_ref = PbSecretRef {
351 secret_id: secret_catalog.id.secret_id(),
352 ref_as: ref_as.into(),
353 };
354 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
355 }
356 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
357}
358
359pub(crate) fn resolve_privatelink_in_with_option(
360 with_options: &mut WithOptions,
361) -> RwResult<Option<ConnectionId>> {
362 let is_kafka = with_options.is_kafka_connector();
363 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
364
365 if let Some(endpoint) = privatelink_endpoint {
367 if !is_kafka {
368 return Err(RwError::from(ErrorCode::ProtocolError(
369 "Privatelink is only supported in kafka connector".to_owned(),
370 )));
371 }
372 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
373 .map_err(RwError::from)?;
374 }
375 Ok(None)
376}
377
378impl TryFrom<&[SqlOption]> for WithOptions {
379 type Error = RwError;
380
381 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
382 let mut inner: BTreeMap<String, String> = BTreeMap::new();
383 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
384 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
385 for option in options {
386 let key = option.name.real_value();
387 match &option.value {
388 SqlOptionValue::SecretRef(r) => {
389 if secret_ref.insert(key.clone(), r.clone()).is_some()
390 || inner.contains_key(&key)
391 {
392 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
393 "Duplicated option: {}",
394 key
395 ))));
396 }
397 continue;
398 }
399 SqlOptionValue::ConnectionRef(r) => {
400 if connection_ref.insert(key.clone(), r.clone()).is_some()
401 || inner.contains_key(&key)
402 {
403 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
404 "Duplicated option: {}",
405 key
406 ))));
407 }
408 continue;
409 }
410 _ => {}
411 }
412 let value: String = match option.value.clone() {
413 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
414 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
415 SqlOptionValue::Value(Value::Number(n)) => n,
416 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
417 _ => {
418 return Err(RwError::from(ErrorCode::InvalidParameterValue(
419 "`with options` or `with properties` only support single quoted string value and C style escaped string"
420 .to_owned(),
421 )))
422 }
423 };
424 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
425 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
426 "Duplicated option: {}",
427 key
428 ))));
429 }
430 }
431
432 Ok(Self {
433 inner,
434 secret_ref,
435 connection_ref,
436 })
437 }
438}
439
440impl TryFrom<&Statement> for WithOptions {
441 type Error = RwError;
442
443 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
445 match statement {
446 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
448
449 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
451
452 Statement::CreateSink {
454 stmt:
455 CreateSinkStatement {
456 with_properties, ..
457 },
458 }
459 | Statement::CreateConnection {
460 stmt:
461 CreateConnectionStatement {
462 with_properties, ..
463 },
464 } => Self::try_from(with_properties.0.as_slice()),
465 Statement::CreateSource {
466 stmt:
467 CreateSourceStatement {
468 with_properties, ..
469 },
470 ..
471 } => Self::try_from(with_properties.0.as_slice()),
472 Statement::CreateSubscription {
473 stmt:
474 CreateSubscriptionStatement {
475 with_properties, ..
476 },
477 ..
478 } => Self::try_from(with_properties.0.as_slice()),
479 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
480
481 _ => Ok(Default::default()),
482 }
483 }
484}