1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31 RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37 BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38 CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39 SqlOptionValue, Statement, Value,
40};
41use thiserror_ext::AsReport;
42
43use super::OverwriteOptions;
44use crate::Binder;
45use crate::error::{ErrorCode, Result as RwResult, RwError};
46use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
47use crate::session::SessionImpl;
48use crate::telemetry::report_event;
49
50pub mod options {
51 pub const RETENTION_SECONDS: &str = "retention_seconds";
52}
53
54pub const MV_REFRESH_INTERVAL_SEC_KEY: &str = "refresh.interval.sec";
55pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
56pub const SOURCE_REFRESH_INTERVAL_SEC_KEY: &str = "refresh_interval_sec";
57
58#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
60pub struct WithOptions {
61 inner: BTreeMap<String, String>,
62 secret_ref: BTreeMap<String, SecretRefValue>,
63 connection_ref: BTreeMap<String, ConnectionRefValue>,
64 backfill_order_strategy: BackfillOrderStrategy,
65}
66
67impl GetKeyIter for WithOptions {
68 fn key_iter(&self) -> impl Iterator<Item = &str> {
69 self.inner.keys().map(|s| s.as_str())
70 }
71}
72
73impl Get for WithOptions {
74 fn get(&self, key: &str) -> Option<&String> {
75 self.inner.get(key)
76 }
77}
78
79impl std::ops::Deref for WithOptions {
80 type Target = BTreeMap<String, String>;
81
82 fn deref(&self) -> &Self::Target {
83 &self.inner
84 }
85}
86
87impl std::ops::DerefMut for WithOptions {
88 fn deref_mut(&mut self) -> &mut Self::Target {
89 &mut self.inner
90 }
91}
92
93impl WithOptions {
94 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
96 Self {
97 inner,
98 secret_ref: Default::default(),
99 connection_ref: Default::default(),
100 backfill_order_strategy: Default::default(),
101 }
102 }
103
104 pub fn new(
106 inner: BTreeMap<String, String>,
107 secret_ref: BTreeMap<String, SecretRefValue>,
108 connection_ref: BTreeMap<String, ConnectionRefValue>,
109 ) -> Self {
110 Self {
111 inner,
112 secret_ref,
113 connection_ref,
114 backfill_order_strategy: Default::default(),
115 }
116 }
117
118 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
119 &mut self.inner
120 }
121
122 pub fn into_parts(
124 self,
125 ) -> (
126 BTreeMap<String, String>,
127 BTreeMap<String, SecretRefValue>,
128 BTreeMap<String, ConnectionRefValue>,
129 ) {
130 (self.inner, self.secret_ref, self.connection_ref)
131 }
132
133 pub fn into_connector_props(self) -> WithOptions {
135 let inner = self
136 .inner
137 .into_iter()
138 .filter(|(key, _)| {
139 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
140 })
141 .collect();
142
143 Self {
144 inner,
145 secret_ref: self.secret_ref,
146 connection_ref: self.connection_ref,
147 backfill_order_strategy: self.backfill_order_strategy,
148 }
149 }
150
151 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
153 self.inner
154 .get(options::RETENTION_SECONDS)
155 .and_then(|s| s.parse().ok())
156 }
157
158 pub fn refresh_interval_sec(&self) -> RwResult<Option<u64>> {
159 self.inner
160 .get(MV_REFRESH_INTERVAL_SEC_KEY)
161 .map(|seconds| {
162 let seconds = seconds.parse::<u64>().map_err(|e| {
163 RwError::from(ErrorCode::InvalidParameterValue(format!(
164 "`{}` must be a positive integer, but got: {} (error: {})",
165 MV_REFRESH_INTERVAL_SEC_KEY,
166 seconds,
167 e.as_report()
168 )))
169 })?;
170 if seconds == 0 {
171 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
172 "`{}` must be larger than 0, but got: {}",
173 MV_REFRESH_INTERVAL_SEC_KEY, seconds
174 ))));
175 }
176 Ok(seconds)
177 })
178 .transpose()
179 }
180
181 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
183 let inner = keys
184 .into_iter()
185 .filter_map(|k| {
186 self.inner
187 .get_key_value(k.as_ref())
188 .map(|(k, v)| (k.clone(), v.clone()))
189 })
190 .collect();
191
192 Self {
193 inner,
194 secret_ref: self.secret_ref.clone(),
195 connection_ref: self.connection_ref.clone(),
196 backfill_order_strategy: self.backfill_order_strategy.clone(),
197 }
198 }
199
200 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
201 if let Some(inner_val) = self.inner.get(key)
202 && inner_val.eq_ignore_ascii_case(val)
203 {
204 return true;
205 }
206 false
207 }
208
209 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
210 &self.secret_ref
211 }
212
213 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
214 &mut self.secret_ref
215 }
216
217 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
218 &self.connection_ref
219 }
220
221 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
222 &mut self.connection_ref
223 }
224
225 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
226 let WithOptions {
227 inner, secret_ref, ..
228 } = WithOptions::try_from(sql_options)?;
229 if secret_ref.is_empty() {
230 Ok(inner)
231 } else {
232 Err(RwError::from(ErrorCode::InvalidParameterValue(
233 "Secret reference is not allowed in OAuth options".to_owned(),
234 )))
235 }
236 }
237
238 pub fn is_source_connector(&self) -> bool {
239 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
240 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
241 }
242
243 pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
244 self.backfill_order_strategy.clone()
245 }
246}
247
248pub(crate) fn resolve_connection_ref_and_secret_ref(
273 with_options: WithOptions,
274 session: &SessionImpl,
275 object: Option<TelemetryDatabaseObject>,
276) -> RwResult<(
277 WithOptionsSecResolved,
278 PbConnectionType,
279 Option<ConnectionId>,
280)> {
281 let connector_name = with_options.get_connector();
282 let db_name: &str = &session.database();
283 let (mut options, secret_refs, connection_refs) = with_options.into_parts();
284
285 let mut connection_id = None;
286 let mut connection_params = None;
287 for connection_ref in connection_refs.values() {
288 connection_params = {
290 let (schema_name, connection_name) =
292 Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
293 let connection_catalog =
294 session.get_connection_by_name(schema_name, &connection_name)?;
295 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
296 connection_id = Some(connection_catalog.id);
297 Some(params.clone())
298 } else {
299 return Err(RwError::from(ErrorCode::InvalidParameterValue(
300 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
301 )));
302 }
303 };
304
305 if let Some(object) = object {
307 report_event(
309 PbTelemetryEventStage::CreateStreamJob,
310 "connection_ref",
311 0,
312 connector_name.clone(),
313 Some(object),
314 {
315 connection_params.as_ref().map(|cp| {
316 jsonbb::json!({
317 "connection_type": cp.connection_type().as_str_name().to_owned()
318 })
319 })
320 },
321 );
322 }
323 }
324
325 let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
326
327 let mut connection_type = PbConnectionType::Unspecified;
329 let connection_params_is_none_flag = connection_params.is_none();
330
331 if let Some(connection_params) = connection_params {
333 if let Some(broker_rewrite_map) = connection_params
341 .get_properties()
342 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
343 && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
344 || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
345 {
346 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
347 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
348 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
349 ))));
350 }
351
352 connection_type = connection_params.connection_type();
354 for (k, v) in connection_params.properties {
355 if options.insert(k.clone(), v).is_some() {
356 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
357 "Duplicated key in both WITH clause and Connection catalog: {}",
358 k
359 ))));
360 }
361 }
362
363 for (k, v) in connection_params.secret_refs {
365 if inner_secret_refs.insert(k.clone(), v).is_some() {
366 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
367 "Duplicated key in both WITH clause and Connection catalog: {}",
368 k
369 ))));
370 }
371 }
372
373 {
375 if connection_type == PbConnectionType::SchemaRegistry {
376 if options
378 .keys()
379 .chain(inner_secret_refs.keys())
380 .any(|k| k.starts_with("aws"))
381 {
382 return Err(RwError::from(ErrorCode::InvalidParameterValue(
383 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
384 )));
385 }
386 }
387 }
388 }
389
390 if connection_params_is_none_flag {
392 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
393 }
394 Ok((
395 WithOptionsSecResolved::new(options, inner_secret_refs),
396 connection_type,
397 connection_id,
398 ))
399}
400
401pub(crate) fn resolve_secret_ref_in_with_options(
403 with_options: WithOptions,
404 session: &SessionImpl,
405) -> RwResult<WithOptionsSecResolved> {
406 let (options, secret_refs, _) = with_options.into_parts();
407 let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
408 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
409}
410
411fn resolve_secret_refs_inner(
412 secret_refs: BTreeMap<String, SecretRefValue>,
413 session: &SessionImpl,
414) -> RwResult<BTreeMap<String, PbSecretRef>> {
415 let db_name: &str = &session.database();
416 let mut resolved_secret_refs = BTreeMap::new();
417 for (key, secret_ref) in secret_refs {
418 let (schema_name, secret_name) =
419 Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
420 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
421 let ref_as = match secret_ref.ref_as {
422 SecretRefAsType::Text => PbRefAsType::Text,
423 SecretRefAsType::File => PbRefAsType::File,
424 };
425 let pb_secret_ref = PbSecretRef {
426 secret_id: secret_catalog.id,
427 ref_as: ref_as.into(),
428 };
429 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
430 }
431 Ok(resolved_secret_refs)
432}
433
434pub(crate) fn resolve_source_refresh_mode_in_with_option(
435 with_options: &mut WithOptions,
436) -> RwResult<Option<SourceRefreshMode>> {
437 let source_refresh_interval_sec =
438 {
439 if let Some(maybe_int) = with_options.remove(SOURCE_REFRESH_INTERVAL_SEC_KEY) {
440 let some_int = maybe_int.parse::<i64>().map_err(|e| {
441 RwError::from(ErrorCode::InvalidParameterValue(format!(
442 "`{}` must be a positive integer and larger than 0, but got: {} (error: {})",
443 SOURCE_REFRESH_INTERVAL_SEC_KEY, maybe_int, e.as_report()
444 )))
445 })?;
446 if some_int <= 0 {
447 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
448 "`{}` must be larger than 0, but got: {}",
449 SOURCE_REFRESH_INTERVAL_SEC_KEY, some_int
450 ))));
451 }
452 Some(some_int)
453 } else {
454 None
455 }
456 };
457
458 let mut is_full_reload = false;
459 let source_refresh_mode_str = with_options.remove(SOURCE_REFRESH_MODE_KEY);
460 let source_refresh_mode = if let Some(source_refresh_mode_str) = &source_refresh_mode_str {
461 Some(match source_refresh_mode_str.to_uppercase().as_str() {
462 "STREAMING" => {
463 if source_refresh_interval_sec.is_some() {
464 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
465 "`{}` is not allowed when `{}` is 'STREAMING'",
466 SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
467 ))));
468 }
469 SourceRefreshMode {
470 refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
471 }
472 }
473 "FULL_RELOAD" => {
474 is_full_reload = true;
475 SourceRefreshMode {
476 refresh_mode: Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
477 refresh_interval_sec: source_refresh_interval_sec,
478 })),
479 }
480 }
481 _ => {
482 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
483 "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RELOAD'",
484 SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
485 ))));
486 }
487 })
488 } else {
489 if source_refresh_interval_sec.is_some() {
491 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
492 "`{}` is not allowed when `{}` is not 'FULL_RELOAD'",
493 SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
494 ))));
495 }
496 None
497 };
498
499 if with_options.is_batch_connector() && !is_full_reload {
501 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
502 "Refreshable source {} must be refreshed with 'FULL_RELOAD' refresh mode. Please set `refresh_mode` to 'FULL_RELOAD'.",
503 with_options.get_connector().unwrap(),
504 ))));
505 }
506 Ok(source_refresh_mode)
507}
508
509pub(crate) fn resolve_privatelink_in_with_option(
510 with_options: &mut WithOptions,
511) -> RwResult<Option<ConnectionId>> {
512 let is_kafka = with_options.is_kafka_connector();
513 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
514
515 if let Some(endpoint) = privatelink_endpoint {
517 if !is_kafka {
518 return Err(RwError::from(ErrorCode::ProtocolError(
519 "Privatelink is only supported in kafka connector".to_owned(),
520 )));
521 }
522 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
523 .map_err(RwError::from)?;
524 }
525 Ok(None)
526}
527
528impl TryFrom<&[SqlOption]> for WithOptions {
529 type Error = RwError;
530
531 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
532 let mut inner: BTreeMap<String, String> = BTreeMap::new();
533 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
534 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
535 let mut backfill_order_strategy = BackfillOrderStrategy::Default;
536 for option in options {
537 let key = option.name.real_value();
538 match &option.value {
539 SqlOptionValue::SecretRef(r) => {
540 if secret_ref.insert(key.clone(), r.clone()).is_some()
541 || inner.contains_key(&key)
542 {
543 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
544 "Duplicated option: {}",
545 key
546 ))));
547 }
548 continue;
549 }
550 SqlOptionValue::ConnectionRef(r) => {
551 if connection_ref.insert(key.clone(), r.clone()).is_some()
552 || inner.contains_key(&key)
553 {
554 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
555 "Duplicated option: {}",
556 key
557 ))));
558 }
559 continue;
560 }
561 SqlOptionValue::BackfillOrder(b) => {
562 backfill_order_strategy = b.clone();
563 continue;
564 }
565 _ => {}
566 }
567 let value: String = match option.value.clone() {
568 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
569 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
570 SqlOptionValue::Value(Value::Number(n)) => n,
571 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
572 _ => {
573 return Err(RwError::from(ErrorCode::InvalidParameterValue(
574 "`with options` or `with properties` only support single quoted string value and C style escaped string"
575 .to_owned(),
576 )))
577 }
578 };
579 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
580 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
581 "Duplicated option: {}",
582 key
583 ))));
584 }
585 }
586
587 Ok(Self {
588 inner,
589 secret_ref,
590 connection_ref,
591 backfill_order_strategy,
592 })
593 }
594}
595
596impl TryFrom<&Statement> for WithOptions {
597 type Error = RwError;
598
599 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
601 match statement {
602 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
604
605 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
607
608 Statement::CreateSink {
610 stmt:
611 CreateSinkStatement {
612 with_properties, ..
613 },
614 }
615 | Statement::CreateConnection {
616 stmt:
617 CreateConnectionStatement {
618 with_properties, ..
619 },
620 }
621 | Statement::CreateSource {
622 stmt:
623 CreateSourceStatement {
624 with_properties, ..
625 },
626 ..
627 }
628 | Statement::CreateSubscription {
629 stmt:
630 CreateSubscriptionStatement {
631 with_properties, ..
632 },
633 ..
634 }
635 | Statement::CreateIndex {
636 with_properties, ..
637 } => Self::try_from(with_properties.0.as_slice()),
638 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
639
640 _ => Ok(Default::default()),
641 }
642 }
643}