risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33    ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
34    CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue,
35    Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46    pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49/// Options or properties extracted from the `WITH` clause of DDLs.
50#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52    inner: BTreeMap<String, String>,
53    secret_ref: BTreeMap<String, SecretRefValue>,
54    connection_ref: BTreeMap<String, ConnectionRefValue>,
55}
56
57impl GetKeyIter for WithOptions {
58    fn key_iter(&self) -> impl Iterator<Item = &str> {
59        self.inner.keys().map(|s| s.as_str())
60    }
61}
62
63impl Get for WithOptions {
64    fn get(&self, key: &str) -> Option<&String> {
65        self.inner.get(key)
66    }
67}
68
69impl std::ops::Deref for WithOptions {
70    type Target = BTreeMap<String, String>;
71
72    fn deref(&self) -> &Self::Target {
73        &self.inner
74    }
75}
76
77impl std::ops::DerefMut for WithOptions {
78    fn deref_mut(&mut self) -> &mut Self::Target {
79        &mut self.inner
80    }
81}
82
83impl WithOptions {
84    /// Create a new [`WithOptions`] from a [`BTreeMap`].
85    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
86        Self {
87            inner,
88            secret_ref: Default::default(),
89            connection_ref: Default::default(),
90        }
91    }
92
93    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
94    pub fn new(
95        inner: BTreeMap<String, String>,
96        secret_ref: BTreeMap<String, SecretRefValue>,
97        connection_ref: BTreeMap<String, ConnectionRefValue>,
98    ) -> Self {
99        Self {
100            inner,
101            secret_ref,
102            connection_ref,
103        }
104    }
105
106    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
107        &mut self.inner
108    }
109
110    /// Take the value of the option map and secret refs.
111    pub fn into_parts(
112        self,
113    ) -> (
114        BTreeMap<String, String>,
115        BTreeMap<String, SecretRefValue>,
116        BTreeMap<String, ConnectionRefValue>,
117    ) {
118        (self.inner, self.secret_ref, self.connection_ref)
119    }
120
121    /// Convert to connector props, remove the key-value pairs used in the top-level.
122    pub fn into_connector_props(self) -> WithOptions {
123        let inner = self
124            .inner
125            .into_iter()
126            .filter(|(key, _)| {
127                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
128            })
129            .collect();
130
131        Self {
132            inner,
133            secret_ref: self.secret_ref,
134            connection_ref: self.connection_ref,
135        }
136    }
137
138    /// Parse the retention seconds from the options.
139    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
140        self.inner
141            .get(options::RETENTION_SECONDS)
142            .and_then(|s| s.parse().ok())
143    }
144
145    /// Get a subset of the options from the given keys.
146    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
147        let inner = keys
148            .into_iter()
149            .filter_map(|k| {
150                self.inner
151                    .get_key_value(k.as_ref())
152                    .map(|(k, v)| (k.clone(), v.clone()))
153            })
154            .collect();
155
156        Self {
157            inner,
158            secret_ref: self.secret_ref.clone(),
159            connection_ref: self.connection_ref.clone(),
160        }
161    }
162
163    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
164        if let Some(inner_val) = self.inner.get(key) {
165            if inner_val.eq_ignore_ascii_case(val) {
166                return true;
167            }
168        }
169        false
170    }
171
172    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
173        &self.secret_ref
174    }
175
176    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
177        &mut self.secret_ref
178    }
179
180    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
181        &self.connection_ref
182    }
183
184    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
185        &mut self.connection_ref
186    }
187
188    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
189        let WithOptions {
190            inner, secret_ref, ..
191        } = WithOptions::try_from(sql_options)?;
192        if secret_ref.is_empty() {
193            Ok(inner)
194        } else {
195            Err(RwError::from(ErrorCode::InvalidParameterValue(
196                "Secret reference is not allowed in OAuth options".to_owned(),
197            )))
198        }
199    }
200
201    pub fn is_source_connector(&self) -> bool {
202        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
203            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
204    }
205}
206
207pub(crate) fn resolve_connection_ref_and_secret_ref(
208    with_options: WithOptions,
209    session: &SessionImpl,
210    object: Option<TelemetryDatabaseObject>,
211) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
212    let connector_name = with_options.get_connector();
213    let db_name: &str = &session.database();
214    let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
215
216    let mut connection_id = None;
217    let mut connection_params = None;
218    for connection_ref in connection_refs.values() {
219        // at most one connection ref in the map
220        connection_params = {
221            // get connection params from catalog
222            let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
223                db_name,
224                connection_ref.connection_name.clone(),
225            )?;
226            let connection_catalog =
227                session.get_connection_by_name(schema_name, &connection_name)?;
228            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
229                connection_id = Some(connection_catalog.id);
230                Some(params.clone())
231            } else {
232                return Err(RwError::from(ErrorCode::InvalidParameterValue(
233                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
234                )));
235            }
236        };
237
238        if let Some(object) = object {
239            // report to telemetry
240            report_event(
241                PbTelemetryEventStage::CreateStreamJob,
242                "connection_ref",
243                0,
244                connector_name.clone(),
245                Some(object),
246                {
247                    connection_params.as_ref().map(|cp| {
248                        jsonbb::json!({
249                            "connection_type": cp.connection_type().as_str_name().to_owned()
250                        })
251                    })
252                },
253            );
254        }
255    }
256
257    let mut inner_secret_refs = {
258        let mut resolved_secret_refs = BTreeMap::new();
259        for (key, secret_ref) in secret_refs {
260            let (schema_name, secret_name) =
261                Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
262            let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
263            let ref_as = match secret_ref.ref_as {
264                SecretRefAsType::Text => PbRefAsType::Text,
265                SecretRefAsType::File => PbRefAsType::File,
266            };
267            let pb_secret_ref = PbSecretRef {
268                secret_id: secret_catalog.id.secret_id(),
269                ref_as: ref_as.into(),
270            };
271            resolved_secret_refs.insert(key.clone(), pb_secret_ref);
272        }
273        resolved_secret_refs
274    };
275
276    let mut connection_type = PbConnectionType::Unspecified;
277    let connection_params_is_none_flag = connection_params.is_none();
278
279    if let Some(connection_params) = connection_params {
280        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
281        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
282        //
283        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
284        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
285        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
286
287        if let Some(broker_rewrite_map) = connection_params
288            .get_properties()
289            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
290        {
291            if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
292                || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
293            {
294                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
295                    "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
296                    broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
297                ))));
298            }
299        }
300
301        connection_type = connection_params.connection_type();
302        for (k, v) in connection_params.properties {
303            if options.insert(k.clone(), v).is_some() {
304                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
305                    "Duplicated key in both WITH clause and Connection catalog: {}",
306                    k
307                ))));
308            }
309        }
310
311        for (k, v) in connection_params.secret_refs {
312            if inner_secret_refs.insert(k.clone(), v).is_some() {
313                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
314                    "Duplicated key in both WITH clause and Connection catalog: {}",
315                    k
316                ))));
317            }
318        }
319
320        {
321            // check if not mess up with schema registry connection and glue connection
322            if connection_type == PbConnectionType::SchemaRegistry {
323                // Check no AWS related options when using schema registry connection
324                if options
325                    .keys()
326                    .chain(inner_secret_refs.keys())
327                    .any(|k| k.starts_with("aws"))
328                {
329                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
330                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
331                        )));
332                }
333            }
334        }
335    }
336
337    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
338    if connection_params_is_none_flag {
339        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
340    }
341    Ok((
342        WithOptionsSecResolved::new(options, inner_secret_refs),
343        connection_type,
344        connection_id,
345    ))
346}
347
348/// Get the secret id from the name.
349pub(crate) fn resolve_secret_ref_in_with_options(
350    with_options: WithOptions,
351    session: &SessionImpl,
352) -> RwResult<WithOptionsSecResolved> {
353    let (options, secret_refs, _) = with_options.into_parts();
354    let mut resolved_secret_refs = BTreeMap::new();
355    let db_name: &str = &session.database();
356    for (key, secret_ref) in secret_refs {
357        let (schema_name, secret_name) =
358            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
359        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
360        let ref_as = match secret_ref.ref_as {
361            SecretRefAsType::Text => PbRefAsType::Text,
362            SecretRefAsType::File => PbRefAsType::File,
363        };
364        let pb_secret_ref = PbSecretRef {
365            secret_id: secret_catalog.id.secret_id(),
366            ref_as: ref_as.into(),
367        };
368        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
369    }
370    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
371}
372
373pub(crate) fn resolve_privatelink_in_with_option(
374    with_options: &mut WithOptions,
375) -> RwResult<Option<ConnectionId>> {
376    let is_kafka = with_options.is_kafka_connector();
377    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
378
379    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
380    if let Some(endpoint) = privatelink_endpoint {
381        if !is_kafka {
382            return Err(RwError::from(ErrorCode::ProtocolError(
383                "Privatelink is only supported in kafka connector".to_owned(),
384            )));
385        }
386        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
387            .map_err(RwError::from)?;
388    }
389    Ok(None)
390}
391
392impl TryFrom<&[SqlOption]> for WithOptions {
393    type Error = RwError;
394
395    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
396        let mut inner: BTreeMap<String, String> = BTreeMap::new();
397        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
398        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
399        for option in options {
400            let key = option.name.real_value();
401            match &option.value {
402                SqlOptionValue::SecretRef(r) => {
403                    if secret_ref.insert(key.clone(), r.clone()).is_some()
404                        || inner.contains_key(&key)
405                    {
406                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
407                            "Duplicated option: {}",
408                            key
409                        ))));
410                    }
411                    continue;
412                }
413                SqlOptionValue::ConnectionRef(r) => {
414                    if connection_ref.insert(key.clone(), r.clone()).is_some()
415                        || inner.contains_key(&key)
416                    {
417                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
418                            "Duplicated option: {}",
419                            key
420                        ))));
421                    }
422                    continue;
423                }
424                _ => {}
425            }
426            let value: String = match option.value.clone() {
427                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
428                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
429                SqlOptionValue::Value(Value::Number(n)) => n,
430                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
431                _ => {
432                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
433                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
434                            .to_owned(),
435                    )))
436                }
437            };
438            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
439                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
440                    "Duplicated option: {}",
441                    key
442                ))));
443            }
444        }
445
446        Ok(Self {
447            inner,
448            secret_ref,
449            connection_ref,
450        })
451    }
452}
453
454impl TryFrom<&Statement> for WithOptions {
455    type Error = RwError;
456
457    /// Extract options from the `WITH` clause from the given statement.
458    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
459        match statement {
460            // Explain: forward to the inner statement.
461            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
462
463            // View
464            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
465
466            // Sink
467            Statement::CreateSink {
468                stmt:
469                    CreateSinkStatement {
470                        with_properties, ..
471                    },
472            }
473            | Statement::CreateConnection {
474                stmt:
475                    CreateConnectionStatement {
476                        with_properties, ..
477                    },
478            } => Self::try_from(with_properties.0.as_slice()),
479            Statement::CreateSource {
480                stmt:
481                    CreateSourceStatement {
482                        with_properties, ..
483                    },
484                ..
485            } => Self::try_from(with_properties.0.as_slice()),
486            Statement::CreateSubscription {
487                stmt:
488                    CreateSubscriptionStatement {
489                        with_properties, ..
490                    },
491                ..
492            } => Self::try_from(with_properties.0.as_slice()),
493            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
494
495            _ => Ok(Default::default()),
496        }
497    }
498}