1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33 ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
34 CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue,
35 Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46 pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52 inner: BTreeMap<String, String>,
53 secret_ref: BTreeMap<String, SecretRefValue>,
54 connection_ref: BTreeMap<String, ConnectionRefValue>,
55}
56
57impl GetKeyIter for WithOptions {
58 fn key_iter(&self) -> impl Iterator<Item = &str> {
59 self.inner.keys().map(|s| s.as_str())
60 }
61}
62
63impl Get for WithOptions {
64 fn get(&self, key: &str) -> Option<&String> {
65 self.inner.get(key)
66 }
67}
68
69impl std::ops::Deref for WithOptions {
70 type Target = BTreeMap<String, String>;
71
72 fn deref(&self) -> &Self::Target {
73 &self.inner
74 }
75}
76
77impl std::ops::DerefMut for WithOptions {
78 fn deref_mut(&mut self) -> &mut Self::Target {
79 &mut self.inner
80 }
81}
82
83impl WithOptions {
84 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
86 Self {
87 inner,
88 secret_ref: Default::default(),
89 connection_ref: Default::default(),
90 }
91 }
92
93 pub fn new(
95 inner: BTreeMap<String, String>,
96 secret_ref: BTreeMap<String, SecretRefValue>,
97 connection_ref: BTreeMap<String, ConnectionRefValue>,
98 ) -> Self {
99 Self {
100 inner,
101 secret_ref,
102 connection_ref,
103 }
104 }
105
106 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
107 &mut self.inner
108 }
109
110 pub fn into_parts(
112 self,
113 ) -> (
114 BTreeMap<String, String>,
115 BTreeMap<String, SecretRefValue>,
116 BTreeMap<String, ConnectionRefValue>,
117 ) {
118 (self.inner, self.secret_ref, self.connection_ref)
119 }
120
121 pub fn into_connector_props(self) -> WithOptions {
123 let inner = self
124 .inner
125 .into_iter()
126 .filter(|(key, _)| {
127 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
128 })
129 .collect();
130
131 Self {
132 inner,
133 secret_ref: self.secret_ref,
134 connection_ref: self.connection_ref,
135 }
136 }
137
138 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
140 self.inner
141 .get(options::RETENTION_SECONDS)
142 .and_then(|s| s.parse().ok())
143 }
144
145 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
147 let inner = keys
148 .into_iter()
149 .filter_map(|k| {
150 self.inner
151 .get_key_value(k.as_ref())
152 .map(|(k, v)| (k.clone(), v.clone()))
153 })
154 .collect();
155
156 Self {
157 inner,
158 secret_ref: self.secret_ref.clone(),
159 connection_ref: self.connection_ref.clone(),
160 }
161 }
162
163 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
164 if let Some(inner_val) = self.inner.get(key) {
165 if inner_val.eq_ignore_ascii_case(val) {
166 return true;
167 }
168 }
169 false
170 }
171
172 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
173 &self.secret_ref
174 }
175
176 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
177 &mut self.secret_ref
178 }
179
180 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
181 &self.connection_ref
182 }
183
184 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
185 &mut self.connection_ref
186 }
187
188 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
189 let WithOptions {
190 inner, secret_ref, ..
191 } = WithOptions::try_from(sql_options)?;
192 if secret_ref.is_empty() {
193 Ok(inner)
194 } else {
195 Err(RwError::from(ErrorCode::InvalidParameterValue(
196 "Secret reference is not allowed in OAuth options".to_owned(),
197 )))
198 }
199 }
200
201 pub fn is_source_connector(&self) -> bool {
202 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
203 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
204 }
205}
206
207pub(crate) fn resolve_connection_ref_and_secret_ref(
208 with_options: WithOptions,
209 session: &SessionImpl,
210 object: Option<TelemetryDatabaseObject>,
211) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
212 let connector_name = with_options.get_connector();
213 let db_name: &str = &session.database();
214 let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
215
216 let mut connection_id = None;
217 let mut connection_params = None;
218 for connection_ref in connection_refs.values() {
219 connection_params = {
221 let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
223 db_name,
224 connection_ref.connection_name.clone(),
225 )?;
226 let connection_catalog =
227 session.get_connection_by_name(schema_name, &connection_name)?;
228 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
229 connection_id = Some(connection_catalog.id);
230 Some(params.clone())
231 } else {
232 return Err(RwError::from(ErrorCode::InvalidParameterValue(
233 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
234 )));
235 }
236 };
237
238 if let Some(object) = object {
239 report_event(
241 PbTelemetryEventStage::CreateStreamJob,
242 "connection_ref",
243 0,
244 connector_name.clone(),
245 Some(object),
246 {
247 connection_params.as_ref().map(|cp| {
248 jsonbb::json!({
249 "connection_type": cp.connection_type().as_str_name().to_owned()
250 })
251 })
252 },
253 );
254 }
255 }
256
257 let mut inner_secret_refs = {
258 let mut resolved_secret_refs = BTreeMap::new();
259 for (key, secret_ref) in secret_refs {
260 let (schema_name, secret_name) =
261 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
262 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
263 let ref_as = match secret_ref.ref_as {
264 SecretRefAsType::Text => PbRefAsType::Text,
265 SecretRefAsType::File => PbRefAsType::File,
266 };
267 let pb_secret_ref = PbSecretRef {
268 secret_id: secret_catalog.id.secret_id(),
269 ref_as: ref_as.into(),
270 };
271 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
272 }
273 resolved_secret_refs
274 };
275
276 let mut connection_type = PbConnectionType::Unspecified;
277 let connection_params_is_none_flag = connection_params.is_none();
278
279 if let Some(connection_params) = connection_params {
280 if let Some(broker_rewrite_map) = connection_params
288 .get_properties()
289 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
290 {
291 if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
292 || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
293 {
294 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
295 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
296 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
297 ))));
298 }
299 }
300
301 connection_type = connection_params.connection_type();
302 for (k, v) in connection_params.properties {
303 if options.insert(k.clone(), v).is_some() {
304 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
305 "Duplicated key in both WITH clause and Connection catalog: {}",
306 k
307 ))));
308 }
309 }
310
311 for (k, v) in connection_params.secret_refs {
312 if inner_secret_refs.insert(k.clone(), v).is_some() {
313 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
314 "Duplicated key in both WITH clause and Connection catalog: {}",
315 k
316 ))));
317 }
318 }
319
320 {
321 if connection_type == PbConnectionType::SchemaRegistry {
323 if options
325 .keys()
326 .chain(inner_secret_refs.keys())
327 .any(|k| k.starts_with("aws"))
328 {
329 return Err(RwError::from(ErrorCode::InvalidParameterValue(
330 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
331 )));
332 }
333 }
334 }
335 }
336
337 if connection_params_is_none_flag {
339 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
340 }
341 Ok((
342 WithOptionsSecResolved::new(options, inner_secret_refs),
343 connection_type,
344 connection_id,
345 ))
346}
347
348pub(crate) fn resolve_secret_ref_in_with_options(
350 with_options: WithOptions,
351 session: &SessionImpl,
352) -> RwResult<WithOptionsSecResolved> {
353 let (options, secret_refs, _) = with_options.into_parts();
354 let mut resolved_secret_refs = BTreeMap::new();
355 let db_name: &str = &session.database();
356 for (key, secret_ref) in secret_refs {
357 let (schema_name, secret_name) =
358 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
359 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
360 let ref_as = match secret_ref.ref_as {
361 SecretRefAsType::Text => PbRefAsType::Text,
362 SecretRefAsType::File => PbRefAsType::File,
363 };
364 let pb_secret_ref = PbSecretRef {
365 secret_id: secret_catalog.id.secret_id(),
366 ref_as: ref_as.into(),
367 };
368 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
369 }
370 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
371}
372
373pub(crate) fn resolve_privatelink_in_with_option(
374 with_options: &mut WithOptions,
375) -> RwResult<Option<ConnectionId>> {
376 let is_kafka = with_options.is_kafka_connector();
377 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
378
379 if let Some(endpoint) = privatelink_endpoint {
381 if !is_kafka {
382 return Err(RwError::from(ErrorCode::ProtocolError(
383 "Privatelink is only supported in kafka connector".to_owned(),
384 )));
385 }
386 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
387 .map_err(RwError::from)?;
388 }
389 Ok(None)
390}
391
392impl TryFrom<&[SqlOption]> for WithOptions {
393 type Error = RwError;
394
395 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
396 let mut inner: BTreeMap<String, String> = BTreeMap::new();
397 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
398 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
399 for option in options {
400 let key = option.name.real_value();
401 match &option.value {
402 SqlOptionValue::SecretRef(r) => {
403 if secret_ref.insert(key.clone(), r.clone()).is_some()
404 || inner.contains_key(&key)
405 {
406 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
407 "Duplicated option: {}",
408 key
409 ))));
410 }
411 continue;
412 }
413 SqlOptionValue::ConnectionRef(r) => {
414 if connection_ref.insert(key.clone(), r.clone()).is_some()
415 || inner.contains_key(&key)
416 {
417 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
418 "Duplicated option: {}",
419 key
420 ))));
421 }
422 continue;
423 }
424 _ => {}
425 }
426 let value: String = match option.value.clone() {
427 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
428 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
429 SqlOptionValue::Value(Value::Number(n)) => n,
430 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
431 _ => {
432 return Err(RwError::from(ErrorCode::InvalidParameterValue(
433 "`with options` or `with properties` only support single quoted string value and C style escaped string"
434 .to_owned(),
435 )))
436 }
437 };
438 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
439 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
440 "Duplicated option: {}",
441 key
442 ))));
443 }
444 }
445
446 Ok(Self {
447 inner,
448 secret_ref,
449 connection_ref,
450 })
451 }
452}
453
454impl TryFrom<&Statement> for WithOptions {
455 type Error = RwError;
456
457 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
459 match statement {
460 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
462
463 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
465
466 Statement::CreateSink {
468 stmt:
469 CreateSinkStatement {
470 with_properties, ..
471 },
472 }
473 | Statement::CreateConnection {
474 stmt:
475 CreateConnectionStatement {
476 with_properties, ..
477 },
478 } => Self::try_from(with_properties.0.as_slice()),
479 Statement::CreateSource {
480 stmt:
481 CreateSourceStatement {
482 with_properties, ..
483 },
484 ..
485 } => Self::try_from(with_properties.0.as_slice()),
486 Statement::CreateSubscription {
487 stmt:
488 CreateSubscriptionStatement {
489 with_properties, ..
490 },
491 ..
492 } => Self::try_from(with_properties.0.as_slice()),
493 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
494
495 _ => Ok(Default::default()),
496 }
497 }
498}