1use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21 PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24 PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33 BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
34 CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
35 SqlOptionValue, Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46 pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52 inner: BTreeMap<String, String>,
53 secret_ref: BTreeMap<String, SecretRefValue>,
54 connection_ref: BTreeMap<String, ConnectionRefValue>,
55 backfill_order_strategy: BackfillOrderStrategy,
56}
57
58impl GetKeyIter for WithOptions {
59 fn key_iter(&self) -> impl Iterator<Item = &str> {
60 self.inner.keys().map(|s| s.as_str())
61 }
62}
63
64impl Get for WithOptions {
65 fn get(&self, key: &str) -> Option<&String> {
66 self.inner.get(key)
67 }
68}
69
70impl std::ops::Deref for WithOptions {
71 type Target = BTreeMap<String, String>;
72
73 fn deref(&self) -> &Self::Target {
74 &self.inner
75 }
76}
77
78impl std::ops::DerefMut for WithOptions {
79 fn deref_mut(&mut self) -> &mut Self::Target {
80 &mut self.inner
81 }
82}
83
84impl WithOptions {
85 pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
87 Self {
88 inner,
89 secret_ref: Default::default(),
90 connection_ref: Default::default(),
91 backfill_order_strategy: Default::default(),
92 }
93 }
94
95 pub fn new(
97 inner: BTreeMap<String, String>,
98 secret_ref: BTreeMap<String, SecretRefValue>,
99 connection_ref: BTreeMap<String, ConnectionRefValue>,
100 ) -> Self {
101 Self {
102 inner,
103 secret_ref,
104 connection_ref,
105 backfill_order_strategy: Default::default(),
106 }
107 }
108
109 pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
110 &mut self.inner
111 }
112
113 pub fn into_parts(
115 self,
116 ) -> (
117 BTreeMap<String, String>,
118 BTreeMap<String, SecretRefValue>,
119 BTreeMap<String, ConnectionRefValue>,
120 ) {
121 (self.inner, self.secret_ref, self.connection_ref)
122 }
123
124 pub fn into_connector_props(self) -> WithOptions {
126 let inner = self
127 .inner
128 .into_iter()
129 .filter(|(key, _)| {
130 key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
131 })
132 .collect();
133
134 Self {
135 inner,
136 secret_ref: self.secret_ref,
137 connection_ref: self.connection_ref,
138 backfill_order_strategy: self.backfill_order_strategy,
139 }
140 }
141
142 pub fn retention_seconds(&self) -> Option<NonZeroU32> {
144 self.inner
145 .get(options::RETENTION_SECONDS)
146 .and_then(|s| s.parse().ok())
147 }
148
149 pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
151 let inner = keys
152 .into_iter()
153 .filter_map(|k| {
154 self.inner
155 .get_key_value(k.as_ref())
156 .map(|(k, v)| (k.clone(), v.clone()))
157 })
158 .collect();
159
160 Self {
161 inner,
162 secret_ref: self.secret_ref.clone(),
163 connection_ref: self.connection_ref.clone(),
164 backfill_order_strategy: self.backfill_order_strategy.clone(),
165 }
166 }
167
168 pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
169 if let Some(inner_val) = self.inner.get(key) {
170 if inner_val.eq_ignore_ascii_case(val) {
171 return true;
172 }
173 }
174 false
175 }
176
177 pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
178 &self.secret_ref
179 }
180
181 pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
182 &mut self.secret_ref
183 }
184
185 pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
186 &self.connection_ref
187 }
188
189 pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
190 &mut self.connection_ref
191 }
192
193 pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
194 let WithOptions {
195 inner, secret_ref, ..
196 } = WithOptions::try_from(sql_options)?;
197 if secret_ref.is_empty() {
198 Ok(inner)
199 } else {
200 Err(RwError::from(ErrorCode::InvalidParameterValue(
201 "Secret reference is not allowed in OAuth options".to_owned(),
202 )))
203 }
204 }
205
206 pub fn is_source_connector(&self) -> bool {
207 self.inner.contains_key(UPSTREAM_SOURCE_KEY)
208 && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
209 }
210
211 pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
212 self.backfill_order_strategy.clone()
213 }
214}
215
216pub(crate) fn resolve_connection_ref_and_secret_ref(
217 with_options: WithOptions,
218 session: &SessionImpl,
219 object: Option<TelemetryDatabaseObject>,
220) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
221 let connector_name = with_options.get_connector();
222 let db_name: &str = &session.database();
223 let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
224
225 let mut connection_id = None;
226 let mut connection_params = None;
227 for connection_ref in connection_refs.values() {
228 connection_params = {
230 let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
232 db_name,
233 connection_ref.connection_name.clone(),
234 )?;
235 let connection_catalog =
236 session.get_connection_by_name(schema_name, &connection_name)?;
237 if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
238 connection_id = Some(connection_catalog.id);
239 Some(params.clone())
240 } else {
241 return Err(RwError::from(ErrorCode::InvalidParameterValue(
242 "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
243 )));
244 }
245 };
246
247 if let Some(object) = object {
248 report_event(
250 PbTelemetryEventStage::CreateStreamJob,
251 "connection_ref",
252 0,
253 connector_name.clone(),
254 Some(object),
255 {
256 connection_params.as_ref().map(|cp| {
257 jsonbb::json!({
258 "connection_type": cp.connection_type().as_str_name().to_owned()
259 })
260 })
261 },
262 );
263 }
264 }
265
266 let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
267
268 let mut connection_type = PbConnectionType::Unspecified;
269 let connection_params_is_none_flag = connection_params.is_none();
270
271 if let Some(connection_params) = connection_params {
272 if let Some(broker_rewrite_map) = connection_params
280 .get_properties()
281 .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
282 {
283 if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
284 || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
285 {
286 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
287 "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
288 broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
289 ))));
290 }
291 }
292
293 connection_type = connection_params.connection_type();
294 for (k, v) in connection_params.properties {
295 if options.insert(k.clone(), v).is_some() {
296 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
297 "Duplicated key in both WITH clause and Connection catalog: {}",
298 k
299 ))));
300 }
301 }
302
303 for (k, v) in connection_params.secret_refs {
304 if inner_secret_refs.insert(k.clone(), v).is_some() {
305 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
306 "Duplicated key in both WITH clause and Connection catalog: {}",
307 k
308 ))));
309 }
310 }
311
312 {
313 if connection_type == PbConnectionType::SchemaRegistry {
315 if options
317 .keys()
318 .chain(inner_secret_refs.keys())
319 .any(|k| k.starts_with("aws"))
320 {
321 return Err(RwError::from(ErrorCode::InvalidParameterValue(
322 "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
323 )));
324 }
325 }
326 }
327 }
328
329 if connection_params_is_none_flag {
331 debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
332 }
333 Ok((
334 WithOptionsSecResolved::new(options, inner_secret_refs),
335 connection_type,
336 connection_id,
337 ))
338}
339
340pub(crate) fn resolve_secret_ref_in_with_options(
342 with_options: WithOptions,
343 session: &SessionImpl,
344) -> RwResult<WithOptionsSecResolved> {
345 let (options, secret_refs, _) = with_options.into_parts();
346 let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
347 Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
348}
349
350fn resolve_secret_refs_inner(
351 secret_refs: BTreeMap<String, SecretRefValue>,
352 session: &SessionImpl,
353) -> RwResult<BTreeMap<String, PbSecretRef>> {
354 let db_name: &str = &session.database();
355 let mut resolved_secret_refs = BTreeMap::new();
356 for (key, secret_ref) in secret_refs {
357 let (schema_name, secret_name) =
358 Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
359 let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
360 let ref_as = match secret_ref.ref_as {
361 SecretRefAsType::Text => PbRefAsType::Text,
362 SecretRefAsType::File => PbRefAsType::File,
363 };
364 let pb_secret_ref = PbSecretRef {
365 secret_id: secret_catalog.id.secret_id(),
366 ref_as: ref_as.into(),
367 };
368 resolved_secret_refs.insert(key.clone(), pb_secret_ref);
369 }
370 Ok(resolved_secret_refs)
371}
372
373pub(crate) fn resolve_privatelink_in_with_option(
374 with_options: &mut WithOptions,
375) -> RwResult<Option<ConnectionId>> {
376 let is_kafka = with_options.is_kafka_connector();
377 let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
378
379 if let Some(endpoint) = privatelink_endpoint {
381 if !is_kafka {
382 return Err(RwError::from(ErrorCode::ProtocolError(
383 "Privatelink is only supported in kafka connector".to_owned(),
384 )));
385 }
386 insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
387 .map_err(RwError::from)?;
388 }
389 Ok(None)
390}
391
392impl TryFrom<&[SqlOption]> for WithOptions {
393 type Error = RwError;
394
395 fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
396 let mut inner: BTreeMap<String, String> = BTreeMap::new();
397 let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
398 let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
399 let mut backfill_order_strategy = BackfillOrderStrategy::Default;
400 for option in options {
401 let key = option.name.real_value();
402 match &option.value {
403 SqlOptionValue::SecretRef(r) => {
404 if secret_ref.insert(key.clone(), r.clone()).is_some()
405 || inner.contains_key(&key)
406 {
407 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
408 "Duplicated option: {}",
409 key
410 ))));
411 }
412 continue;
413 }
414 SqlOptionValue::ConnectionRef(r) => {
415 if connection_ref.insert(key.clone(), r.clone()).is_some()
416 || inner.contains_key(&key)
417 {
418 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
419 "Duplicated option: {}",
420 key
421 ))));
422 }
423 continue;
424 }
425 SqlOptionValue::BackfillOrder(b) => {
426 backfill_order_strategy = b.clone();
427 continue;
428 }
429 _ => {}
430 }
431 let value: String = match option.value.clone() {
432 SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
433 SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
434 SqlOptionValue::Value(Value::Number(n)) => n,
435 SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
436 _ => {
437 return Err(RwError::from(ErrorCode::InvalidParameterValue(
438 "`with options` or `with properties` only support single quoted string value and C style escaped string"
439 .to_owned(),
440 )))
441 }
442 };
443 if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
444 return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
445 "Duplicated option: {}",
446 key
447 ))));
448 }
449 }
450
451 Ok(Self {
452 inner,
453 secret_ref,
454 connection_ref,
455 backfill_order_strategy,
456 })
457 }
458}
459
460impl TryFrom<&Statement> for WithOptions {
461 type Error = RwError;
462
463 fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
465 match statement {
466 Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
468
469 Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
471
472 Statement::CreateSink {
474 stmt:
475 CreateSinkStatement {
476 with_properties, ..
477 },
478 }
479 | Statement::CreateConnection {
480 stmt:
481 CreateConnectionStatement {
482 with_properties, ..
483 },
484 } => Self::try_from(with_properties.0.as_slice()),
485 Statement::CreateSource {
486 stmt:
487 CreateSourceStatement {
488 with_properties, ..
489 },
490 ..
491 } => Self::try_from(with_properties.0.as_slice()),
492 Statement::CreateSubscription {
493 stmt:
494 CreateSubscriptionStatement {
495 with_properties, ..
496 },
497 ..
498 } => Self::try_from(with_properties.0.as_slice()),
499 Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
500
501 _ => Ok(Default::default()),
502 }
503 }
504}