1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37 BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38 CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39 SqlOptionValue, Statement, Value,
40};
41use thiserror_ext::AsReport;
42
43use super::OverwriteOptions;
44use crate::Binder;
45use crate::error::{ErrorCode, Result as RwResult, RwError};
46use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
47use crate::session::SessionImpl;
48use crate::telemetry::report_event;
49
50pub mod options {
51 pub const RETENTION_SECONDS: &str = "retention_seconds";
52}
53
54pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
55pub const SOURCE_REFRESH_INTERVAL_SEC_KEY: &str = "refresh_interval_sec";
56
57#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
59pub struct WithOptions {
60 inner: BTreeMap<String, String>,
61 secret_ref: BTreeMap<String, SecretRefValue>,
62 connection_ref: BTreeMap<String, ConnectionRefValue>,
63 backfill_order_strategy: BackfillOrderStrategy,
64}
65
66impl GetKeyIter for WithOptions {
67 fn key_iter(&self) -> impl Iterator<Item = &str> {
68 self.inner.keys().map(|s| s.as_str())
69 }
70}
71
72impl Get for WithOptions {
73 fn get(&self, key: &str) -> Option<&String> {
74 self.inner.get(key)
75 }
76}
77
78impl std::ops::Deref for WithOptions {
79 type Target = BTreeMap<String, String>;
80
81 fn deref(&self) -> &Self::Target {
82 &self.inner
83 }
84}
85
86impl std::ops::DerefMut for WithOptions {
87 fn deref_mut(&mut self) -> &mut Self::Target {
88 &mut self.inner
89 }
90}
91
92impl WithOptions {
93 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
95 Self {
96 inner,
97 secret_ref: Default::default(),
98 connection_ref: Default::default(),
99 backfill_order_strategy: Default::default(),
100 }
101 }
102
103 pub fn new(
105 inner: BTreeMap<String, String>,
106 secret_ref: BTreeMap<String, SecretRefValue>,
107 connection_ref: BTreeMap<String, ConnectionRefValue>,
108 ) -> Self {
109 Self {
110 inner,
111 secret_ref,
112 connection_ref,
113 backfill_order_strategy: Default::default(),
114 }
115 }
116
117 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
118 &mut self.inner
119 }
120
121 pub fn into_parts(
123 self,
124 ) -> (
125 BTreeMap<String, String>,
126 BTreeMap<String, SecretRefValue>,
127 BTreeMap<String, ConnectionRefValue>,
128 ) {
129 (self.inner, self.secret_ref, self.connection_ref)
130 }
131
132 pub fn into_connector_props(self) -> WithOptions {
134 let inner = self
135 .inner
136 .into_iter()
137 .filter(|(key, _)| {
138 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
139 })
140 .collect();
141
142 Self {
143 inner,
144 secret_ref: self.secret_ref,
145 connection_ref: self.connection_ref,
146 backfill_order_strategy: self.backfill_order_strategy,
147 }
148 }
149
150 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
152 self.inner
153 .get(options::RETENTION_SECONDS)
154 .and_then(|s| s.parse().ok())
155 }
156
157 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
159 let inner = keys
160 .into_iter()
161 .filter_map(|k| {
162 self.inner
163 .get_key_value(k.as_ref())
164 .map(|(k, v)| (k.clone(), v.clone()))
165 })
166 .collect();
167
168 Self {
169 inner,
170 secret_ref: self.secret_ref.clone(),
171 connection_ref: self.connection_ref.clone(),
172 backfill_order_strategy: self.backfill_order_strategy.clone(),
173 }
174 }
175
176 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
177 if let Some(inner_val) = self.inner.get(key)
178 && inner_val.eq_ignore_ascii_case(val)
179 {
180 return true;
181 }
182 false
183 }
184
185 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
186 &self.secret_ref
187 }
188
189 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
190 &mut self.secret_ref
191 }
192
193 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
194 &self.connection_ref
195 }
196
197 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
198 &mut self.connection_ref
199 }
200
201 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
202 let WithOptions {
203 inner, secret_ref, ..
204 } = WithOptions::try_from(sql_options)?;
205 if secret_ref.is_empty() {
206 Ok(inner)
207 } else {
208 Err(RwError::from(ErrorCode::InvalidParameterValue(
209 "Secret reference is not allowed in OAuth options".to_owned(),
210 )))
211 }
212 }
213
214 pub fn is_source_connector(&self) -> bool {
215 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
216 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
217 }
218
219 pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
220 self.backfill_order_strategy.clone()
221 }
222}
223
224pub(crate) fn resolve_connection_ref_and_secret_ref(
249 with_options: WithOptions,
250 session: &SessionImpl,
251 object: Option<TelemetryDatabaseObject>,
252) -> RwResult<(
253 WithOptionsSecResolved,
254 PbConnectionType,
255 Option<ConnectionId>,
256)> {
257 let connector_name = with_options.get_connector();
258 let db_name: &str = &session.database();
259 let (mut options, secret_refs, connection_refs) = with_options.into_parts();
260
261 let mut connection_id = None;
262 let mut connection_params = None;
263 for connection_ref in connection_refs.values() {
264 connection_params = {
266 let (schema_name, connection_name) =
268 Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
269 let connection_catalog =
270 session.get_connection_by_name(schema_name, &connection_name)?;
271 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
272 connection_id = Some(connection_catalog.id);
273 Some(params.clone())
274 } else {
275 return Err(RwError::from(ErrorCode::InvalidParameterValue(
276 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
277 )));
278 }
279 };
280
281 if let Some(object) = object {
283 report_event(
285 PbTelemetryEventStage::CreateStreamJob,
286 "connection_ref",
287 0,
288 connector_name.clone(),
289 Some(object),
290 {
291 connection_params.as_ref().map(|cp| {
292 jsonbb::json!({
293 "connection_type": cp.connection_type().as_str_name().to_owned()
294 })
295 })
296 },
297 );
298 }
299 }
300
301 let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
302
303 let mut connection_type = PbConnectionType::Unspecified;
305 let connection_params_is_none_flag = connection_params.is_none();
306
307 if let Some(connection_params) = connection_params {
309 if let Some(broker_rewrite_map) = connection_params
317 .get_properties()
318 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
319 && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
320 || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
321 {
322 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
323 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
324 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
325 ))));
326 }
327
328 connection_type = connection_params.connection_type();
330 for (k, v) in connection_params.properties {
331 if options.insert(k.clone(), v).is_some() {
332 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
333 "Duplicated key in both WITH clause and Connection catalog: {}",
334 k
335 ))));
336 }
337 }
338
339 for (k, v) in connection_params.secret_refs {
341 if inner_secret_refs.insert(k.clone(), v).is_some() {
342 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
343 "Duplicated key in both WITH clause and Connection catalog: {}",
344 k
345 ))));
346 }
347 }
348
349 {
351 if connection_type == PbConnectionType::SchemaRegistry {
352 if options
354 .keys()
355 .chain(inner_secret_refs.keys())
356 .any(|k| k.starts_with("aws"))
357 {
358 return Err(RwError::from(ErrorCode::InvalidParameterValue(
359 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
360 )));
361 }
362 }
363 }
364 }
365
366 if connection_params_is_none_flag {
368 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
369 }
370 Ok((
371 WithOptionsSecResolved::new(options, inner_secret_refs),
372 connection_type,
373 connection_id,
374 ))
375}
376
377pub(crate) fn resolve_secret_ref_in_with_options(
379 with_options: WithOptions,
380 session: &SessionImpl,
381) -> RwResult<WithOptionsSecResolved> {
382 let (options, secret_refs, _) = with_options.into_parts();
383 let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
384 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
385}
386
387fn resolve_secret_refs_inner(
388 secret_refs: BTreeMap<String, SecretRefValue>,
389 session: &SessionImpl,
390) -> RwResult<BTreeMap<String, PbSecretRef>> {
391 let db_name: &str = &session.database();
392 let mut resolved_secret_refs = BTreeMap::new();
393 for (key, secret_ref) in secret_refs {
394 let (schema_name, secret_name) =
395 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
396 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
397 let ref_as = match secret_ref.ref_as {
398 SecretRefAsType::Text => PbRefAsType::Text,
399 SecretRefAsType::File => PbRefAsType::File,
400 };
401 let pb_secret_ref = PbSecretRef {
402 secret_id: secret_catalog.id,
403 ref_as: ref_as.into(),
404 };
405 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
406 }
407 Ok(resolved_secret_refs)
408}
409
410pub(crate) fn resolve_source_refresh_mode_in_with_option(
411 with_options: &mut WithOptions,
412) -> RwResult<Option<SourceRefreshMode>> {
413 let source_refresh_interval_sec =
414 {
415 if let Some(maybe_int) = with_options.remove(SOURCE_REFRESH_INTERVAL_SEC_KEY) {
416 let some_int = maybe_int.parse::<i64>().map_err(|e| {
417 RwError::from(ErrorCode::InvalidParameterValue(format!(
418 "`{}` must be a positive integer and larger than 0, but got: {} (error: {})",
419 SOURCE_REFRESH_INTERVAL_SEC_KEY, maybe_int, e.as_report()
420 )))
421 })?;
422 if some_int <= 0 {
423 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
424 "`{}` must be larger than 0, but got: {}",
425 SOURCE_REFRESH_INTERVAL_SEC_KEY, some_int
426 ))));
427 }
428 Some(some_int)
429 } else {
430 None
431 }
432 };
433
434 let source_refresh_mode =
435 if let Some(source_refresh_mode_str) = with_options.remove(SOURCE_REFRESH_MODE_KEY) {
436 match source_refresh_mode_str.to_uppercase().as_str() {
437 "STREAMING" => {
438 if source_refresh_interval_sec.is_some() {
439 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
440 "`{}` is not allowed when `{}` is 'STREAMING'",
441 SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
442 ))));
443 }
444 SourceRefreshMode {
445 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
446 }
447 }
448 "FULL_RELOAD" => SourceRefreshMode {
449 refresh_mode: Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
450 refresh_interval_sec: source_refresh_interval_sec,
451 })),
452 },
453 _ => {
454 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
455 "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RELOAD'",
456 SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
457 ))));
458 }
459 }
460 } else {
461 if source_refresh_interval_sec.is_some() {
463 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
464 "`{}` is not allowed when `{}` is not 'FULL_RELOAD'",
465 SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
466 ))));
467 }
468 return Ok(None);
469 };
470 Ok(Some(source_refresh_mode))
471}
472
473pub(crate) fn resolve_privatelink_in_with_option(
474 with_options: &mut WithOptions,
475) -> RwResult<Option<ConnectionId>> {
476 let is_kafka = with_options.is_kafka_connector();
477 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
478
479 if let Some(endpoint) = privatelink_endpoint {
481 if !is_kafka {
482 return Err(RwError::from(ErrorCode::ProtocolError(
483 "Privatelink is only supported in kafka connector".to_owned(),
484 )));
485 }
486 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
487 .map_err(RwError::from)?;
488 }
489 Ok(None)
490}
491
492impl TryFrom<&[SqlOption]> for WithOptions {
493 type Error = RwError;
494
495 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
496 let mut inner: BTreeMap<String, String> = BTreeMap::new();
497 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
498 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
499 let mut backfill_order_strategy = BackfillOrderStrategy::Default;
500 for option in options {
501 let key = option.name.real_value();
502 match &option.value {
503 SqlOptionValue::SecretRef(r) => {
504 if secret_ref.insert(key.clone(), r.clone()).is_some()
505 || inner.contains_key(&key)
506 {
507 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
508 "Duplicated option: {}",
509 key
510 ))));
511 }
512 continue;
513 }
514 SqlOptionValue::ConnectionRef(r) => {
515 if connection_ref.insert(key.clone(), r.clone()).is_some()
516 || inner.contains_key(&key)
517 {
518 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
519 "Duplicated option: {}",
520 key
521 ))));
522 }
523 continue;
524 }
525 SqlOptionValue::BackfillOrder(b) => {
526 backfill_order_strategy = b.clone();
527 continue;
528 }
529 _ => {}
530 }
531 let value: String = match option.value.clone() {
532 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
533 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
534 SqlOptionValue::Value(Value::Number(n)) => n,
535 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
536 _ => {
537 return Err(RwError::from(ErrorCode::InvalidParameterValue(
538 "`with options` or `with properties` only support single quoted string value and C style escaped string"
539 .to_owned(),
540 )))
541 }
542 };
543 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
544 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
545 "Duplicated option: {}",
546 key
547 ))));
548 }
549 }
550
551 Ok(Self {
552 inner,
553 secret_ref,
554 connection_ref,
555 backfill_order_strategy,
556 })
557 }
558}
559
560impl TryFrom<&Statement> for WithOptions {
561 type Error = RwError;
562
563 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
565 match statement {
566 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
568
569 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
571
572 Statement::CreateSink {
574 stmt:
575 CreateSinkStatement {
576 with_properties, ..
577 },
578 }
579 | Statement::CreateConnection {
580 stmt:
581 CreateConnectionStatement {
582 with_properties, ..
583 },
584 }
585 | Statement::CreateSource {
586 stmt:
587 CreateSourceStatement {
588 with_properties, ..
589 },
590 ..
591 }
592 | Statement::CreateSubscription {
593 stmt:
594 CreateSubscriptionStatement {
595 with_properties, ..
596 },
597 ..
598 }
599 | Statement::CreateIndex {
600 with_properties, ..
601 } => Self::try_from(with_properties.0.as_slice()),
602 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
603
604 _ => Ok(Default::default()),
605 }
606 }
607}