1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31 RefreshMode, SourceRefreshModeFullRecompute, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37 BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38 CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39 SqlOptionValue, Statement, Value,
40};
41
42use super::OverwriteOptions;
43use crate::Binder;
44use crate::error::{ErrorCode, Result as RwResult, RwError};
45use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
46use crate::session::SessionImpl;
47use crate::telemetry::report_event;
48
49pub mod options {
50 pub const RETENTION_SECONDS: &str = "retention_seconds";
51}
52
53pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
54
55#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
57pub struct WithOptions {
58 inner: BTreeMap<String, String>,
59 secret_ref: BTreeMap<String, SecretRefValue>,
60 connection_ref: BTreeMap<String, ConnectionRefValue>,
61 backfill_order_strategy: BackfillOrderStrategy,
62}
63
64impl GetKeyIter for WithOptions {
65 fn key_iter(&self) -> impl Iterator<Item = &str> {
66 self.inner.keys().map(|s| s.as_str())
67 }
68}
69
70impl Get for WithOptions {
71 fn get(&self, key: &str) -> Option<&String> {
72 self.inner.get(key)
73 }
74}
75
76impl std::ops::Deref for WithOptions {
77 type Target = BTreeMap<String, String>;
78
79 fn deref(&self) -> &Self::Target {
80 &self.inner
81 }
82}
83
84impl std::ops::DerefMut for WithOptions {
85 fn deref_mut(&mut self) -> &mut Self::Target {
86 &mut self.inner
87 }
88}
89
90impl WithOptions {
91 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
93 Self {
94 inner,
95 secret_ref: Default::default(),
96 connection_ref: Default::default(),
97 backfill_order_strategy: Default::default(),
98 }
99 }
100
101 pub fn new(
103 inner: BTreeMap<String, String>,
104 secret_ref: BTreeMap<String, SecretRefValue>,
105 connection_ref: BTreeMap<String, ConnectionRefValue>,
106 ) -> Self {
107 Self {
108 inner,
109 secret_ref,
110 connection_ref,
111 backfill_order_strategy: Default::default(),
112 }
113 }
114
115 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
116 &mut self.inner
117 }
118
119 pub fn into_parts(
121 self,
122 ) -> (
123 BTreeMap<String, String>,
124 BTreeMap<String, SecretRefValue>,
125 BTreeMap<String, ConnectionRefValue>,
126 ) {
127 (self.inner, self.secret_ref, self.connection_ref)
128 }
129
130 pub fn into_connector_props(self) -> WithOptions {
132 let inner = self
133 .inner
134 .into_iter()
135 .filter(|(key, _)| {
136 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
137 })
138 .collect();
139
140 Self {
141 inner,
142 secret_ref: self.secret_ref,
143 connection_ref: self.connection_ref,
144 backfill_order_strategy: self.backfill_order_strategy,
145 }
146 }
147
148 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
150 self.inner
151 .get(options::RETENTION_SECONDS)
152 .and_then(|s| s.parse().ok())
153 }
154
155 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
157 let inner = keys
158 .into_iter()
159 .filter_map(|k| {
160 self.inner
161 .get_key_value(k.as_ref())
162 .map(|(k, v)| (k.clone(), v.clone()))
163 })
164 .collect();
165
166 Self {
167 inner,
168 secret_ref: self.secret_ref.clone(),
169 connection_ref: self.connection_ref.clone(),
170 backfill_order_strategy: self.backfill_order_strategy.clone(),
171 }
172 }
173
174 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
175 if let Some(inner_val) = self.inner.get(key)
176 && inner_val.eq_ignore_ascii_case(val)
177 {
178 return true;
179 }
180 false
181 }
182
183 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
184 &self.secret_ref
185 }
186
187 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
188 &mut self.secret_ref
189 }
190
191 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
192 &self.connection_ref
193 }
194
195 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
196 &mut self.connection_ref
197 }
198
199 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
200 let WithOptions {
201 inner, secret_ref, ..
202 } = WithOptions::try_from(sql_options)?;
203 if secret_ref.is_empty() {
204 Ok(inner)
205 } else {
206 Err(RwError::from(ErrorCode::InvalidParameterValue(
207 "Secret reference is not allowed in OAuth options".to_owned(),
208 )))
209 }
210 }
211
212 pub fn is_source_connector(&self) -> bool {
213 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
214 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
215 }
216
217 pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
218 self.backfill_order_strategy.clone()
219 }
220}
221
222pub(crate) fn resolve_connection_ref_and_secret_ref(
247 with_options: WithOptions,
248 session: &SessionImpl,
249 object: Option<TelemetryDatabaseObject>,
250) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
251 let connector_name = with_options.get_connector();
252 let db_name: &str = &session.database();
253 let (mut options, secret_refs, connection_refs) = with_options.into_parts();
254
255 let mut connection_id = None;
256 let mut connection_params = None;
257 for connection_ref in connection_refs.values() {
258 connection_params = {
260 let (schema_name, connection_name) =
262 Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
263 let connection_catalog =
264 session.get_connection_by_name(schema_name, &connection_name)?;
265 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
266 connection_id = Some(connection_catalog.id);
267 Some(params.clone())
268 } else {
269 return Err(RwError::from(ErrorCode::InvalidParameterValue(
270 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
271 )));
272 }
273 };
274
275 if let Some(object) = object {
277 report_event(
279 PbTelemetryEventStage::CreateStreamJob,
280 "connection_ref",
281 0,
282 connector_name.clone(),
283 Some(object),
284 {
285 connection_params.as_ref().map(|cp| {
286 jsonbb::json!({
287 "connection_type": cp.connection_type().as_str_name().to_owned()
288 })
289 })
290 },
291 );
292 }
293 }
294
295 let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
296
297 let mut connection_type = PbConnectionType::Unspecified;
299 let connection_params_is_none_flag = connection_params.is_none();
300
301 if let Some(connection_params) = connection_params {
303 if let Some(broker_rewrite_map) = connection_params
311 .get_properties()
312 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
313 && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
314 || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
315 {
316 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
317 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
318 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
319 ))));
320 }
321
322 connection_type = connection_params.connection_type();
324 for (k, v) in connection_params.properties {
325 if options.insert(k.clone(), v).is_some() {
326 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
327 "Duplicated key in both WITH clause and Connection catalog: {}",
328 k
329 ))));
330 }
331 }
332
333 for (k, v) in connection_params.secret_refs {
335 if inner_secret_refs.insert(k.clone(), v).is_some() {
336 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
337 "Duplicated key in both WITH clause and Connection catalog: {}",
338 k
339 ))));
340 }
341 }
342
343 {
345 if connection_type == PbConnectionType::SchemaRegistry {
346 if options
348 .keys()
349 .chain(inner_secret_refs.keys())
350 .any(|k| k.starts_with("aws"))
351 {
352 return Err(RwError::from(ErrorCode::InvalidParameterValue(
353 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
354 )));
355 }
356 }
357 }
358 }
359
360 if connection_params_is_none_flag {
362 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
363 }
364 Ok((
365 WithOptionsSecResolved::new(options, inner_secret_refs),
366 connection_type,
367 connection_id,
368 ))
369}
370
371pub(crate) fn resolve_secret_ref_in_with_options(
373 with_options: WithOptions,
374 session: &SessionImpl,
375) -> RwResult<WithOptionsSecResolved> {
376 let (options, secret_refs, _) = with_options.into_parts();
377 let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
378 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
379}
380
381fn resolve_secret_refs_inner(
382 secret_refs: BTreeMap<String, SecretRefValue>,
383 session: &SessionImpl,
384) -> RwResult<BTreeMap<String, PbSecretRef>> {
385 let db_name: &str = &session.database();
386 let mut resolved_secret_refs = BTreeMap::new();
387 for (key, secret_ref) in secret_refs {
388 let (schema_name, secret_name) =
389 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
390 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
391 let ref_as = match secret_ref.ref_as {
392 SecretRefAsType::Text => PbRefAsType::Text,
393 SecretRefAsType::File => PbRefAsType::File,
394 };
395 let pb_secret_ref = PbSecretRef {
396 secret_id: secret_catalog.id.secret_id(),
397 ref_as: ref_as.into(),
398 };
399 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
400 }
401 Ok(resolved_secret_refs)
402}
403
404pub(crate) fn resolve_source_refresh_mode_in_with_option(
405 with_options: &mut WithOptions,
406) -> RwResult<Option<SourceRefreshMode>> {
407 let source_refresh_mode = if let Some(source_refresh_mode_str) =
408 with_options.remove(SOURCE_REFRESH_MODE_KEY)
409 {
410 match source_refresh_mode_str.to_uppercase().as_str() {
411 "STREAMING" => SourceRefreshMode {
412 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
413 },
414 "FULL_RECOMPUTE" => SourceRefreshMode {
415 refresh_mode: Some(RefreshMode::FullRecompute(
416 SourceRefreshModeFullRecompute {},
417 )),
418 },
419 _ => {
420 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
421 "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RECOMPUTE'",
422 SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
423 ))));
424 }
425 }
426 } else {
427 return Ok(None);
428 };
429 Ok(Some(source_refresh_mode))
430}
431
432pub(crate) fn resolve_privatelink_in_with_option(
433 with_options: &mut WithOptions,
434) -> RwResult<Option<ConnectionId>> {
435 let is_kafka = with_options.is_kafka_connector();
436 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
437
438 if let Some(endpoint) = privatelink_endpoint {
440 if !is_kafka {
441 return Err(RwError::from(ErrorCode::ProtocolError(
442 "Privatelink is only supported in kafka connector".to_owned(),
443 )));
444 }
445 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
446 .map_err(RwError::from)?;
447 }
448 Ok(None)
449}
450
451impl TryFrom<&[SqlOption]> for WithOptions {
452 type Error = RwError;
453
454 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
455 let mut inner: BTreeMap<String, String> = BTreeMap::new();
456 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
457 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
458 let mut backfill_order_strategy = BackfillOrderStrategy::Default;
459 for option in options {
460 let key = option.name.real_value();
461 match &option.value {
462 SqlOptionValue::SecretRef(r) => {
463 if secret_ref.insert(key.clone(), r.clone()).is_some()
464 || inner.contains_key(&key)
465 {
466 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
467 "Duplicated option: {}",
468 key
469 ))));
470 }
471 continue;
472 }
473 SqlOptionValue::ConnectionRef(r) => {
474 if connection_ref.insert(key.clone(), r.clone()).is_some()
475 || inner.contains_key(&key)
476 {
477 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
478 "Duplicated option: {}",
479 key
480 ))));
481 }
482 continue;
483 }
484 SqlOptionValue::BackfillOrder(b) => {
485 backfill_order_strategy = b.clone();
486 continue;
487 }
488 _ => {}
489 }
490 let value: String = match option.value.clone() {
491 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
492 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
493 SqlOptionValue::Value(Value::Number(n)) => n,
494 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
495 _ => {
496 return Err(RwError::from(ErrorCode::InvalidParameterValue(
497 "`with options` or `with properties` only support single quoted string value and C style escaped string"
498 .to_owned(),
499 )))
500 }
501 };
502 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
503 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
504 "Duplicated option: {}",
505 key
506 ))));
507 }
508 }
509
510 Ok(Self {
511 inner,
512 secret_ref,
513 connection_ref,
514 backfill_order_strategy,
515 })
516 }
517}
518
519impl TryFrom<&Statement> for WithOptions {
520 type Error = RwError;
521
522 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
524 match statement {
525 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
527
528 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
530
531 Statement::CreateSink {
533 stmt:
534 CreateSinkStatement {
535 with_properties, ..
536 },
537 }
538 | Statement::CreateConnection {
539 stmt:
540 CreateConnectionStatement {
541 with_properties, ..
542 },
543 }
544 | Statement::CreateSource {
545 stmt:
546 CreateSourceStatement {
547 with_properties, ..
548 },
549 ..
550 }
551 | Statement::CreateSubscription {
552 stmt:
553 CreateSubscriptionStatement {
554 with_properties, ..
555 },
556 ..
557 }
558 | Statement::CreateIndex {
559 with_properties, ..
560 } => Self::try_from(with_properties.0.as_slice()),
561 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
562
563 _ => Ok(Default::default()),
564 }
565 }
566}