risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::WithPropertiesExt;
21use risingwave_connector::connector_common::{
22    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
23};
24use risingwave_connector::source::kafka::private_link::{
25    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
26};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33    ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement, CreateSourceStatement,
34    CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption, SqlOptionValue,
35    Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46    pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49/// Options or properties extracted from the `WITH` clause of DDLs.
50#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52    inner: BTreeMap<String, String>,
53    secret_ref: BTreeMap<String, SecretRefValue>,
54    connection_ref: BTreeMap<String, ConnectionRefValue>,
55}
56
57impl std::ops::Deref for WithOptions {
58    type Target = BTreeMap<String, String>;
59
60    fn deref(&self) -> &Self::Target {
61        &self.inner
62    }
63}
64
65impl std::ops::DerefMut for WithOptions {
66    fn deref_mut(&mut self) -> &mut Self::Target {
67        &mut self.inner
68    }
69}
70
71impl WithOptions {
72    /// Create a new [`WithOptions`] from a [`BTreeMap`].
73    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
74        Self {
75            inner,
76            secret_ref: Default::default(),
77            connection_ref: Default::default(),
78        }
79    }
80
81    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
82    pub fn new(
83        inner: BTreeMap<String, String>,
84        secret_ref: BTreeMap<String, SecretRefValue>,
85        connection_ref: BTreeMap<String, ConnectionRefValue>,
86    ) -> Self {
87        Self {
88            inner,
89            secret_ref,
90            connection_ref,
91        }
92    }
93
94    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
95        &mut self.inner
96    }
97
98    /// Take the value of the option map and secret refs.
99    pub fn into_parts(
100        self,
101    ) -> (
102        BTreeMap<String, String>,
103        BTreeMap<String, SecretRefValue>,
104        BTreeMap<String, ConnectionRefValue>,
105    ) {
106        (self.inner, self.secret_ref, self.connection_ref)
107    }
108
109    /// Convert to connector props, remove the key-value pairs used in the top-level.
110    pub fn into_connector_props(self) -> WithOptions {
111        let inner = self
112            .inner
113            .into_iter()
114            .filter(|(key, _)| {
115                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
116            })
117            .collect();
118
119        Self {
120            inner,
121            secret_ref: self.secret_ref,
122            connection_ref: self.connection_ref,
123        }
124    }
125
126    /// Parse the retention seconds from the options.
127    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
128        self.inner
129            .get(options::RETENTION_SECONDS)
130            .and_then(|s| s.parse().ok())
131    }
132
133    /// Get a subset of the options from the given keys.
134    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
135        let inner = keys
136            .into_iter()
137            .filter_map(|k| {
138                self.inner
139                    .get_key_value(k.as_ref())
140                    .map(|(k, v)| (k.clone(), v.clone()))
141            })
142            .collect();
143
144        Self {
145            inner,
146            secret_ref: self.secret_ref.clone(),
147            connection_ref: self.connection_ref.clone(),
148        }
149    }
150
151    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
152        if let Some(inner_val) = self.inner.get(key) {
153            if inner_val.eq_ignore_ascii_case(val) {
154                return true;
155            }
156        }
157        false
158    }
159
160    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
161        &self.secret_ref
162    }
163
164    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
165        &mut self.secret_ref
166    }
167
168    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
169        &self.connection_ref
170    }
171
172    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
173        &mut self.connection_ref
174    }
175
176    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
177        let WithOptions {
178            inner, secret_ref, ..
179        } = WithOptions::try_from(sql_options)?;
180        if secret_ref.is_empty() {
181            Ok(inner)
182        } else {
183            Err(RwError::from(ErrorCode::InvalidParameterValue(
184                "Secret reference is not allowed in OAuth options".to_owned(),
185            )))
186        }
187    }
188
189    pub fn is_source_connector(&self) -> bool {
190        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
191            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
192    }
193}
194
195pub(crate) fn resolve_connection_ref_and_secret_ref(
196    with_options: WithOptions,
197    session: &SessionImpl,
198    object: TelemetryDatabaseObject,
199) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
200    let connector_name = with_options.get_connector();
201    let db_name: &str = &session.database();
202    let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
203
204    let mut connection_id = None;
205    let mut connection_params = None;
206    for connection_ref in connection_refs.values() {
207        // at most one connection ref in the map
208        connection_params = {
209            // get connection params from catalog
210            let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
211                db_name,
212                connection_ref.connection_name.clone(),
213            )?;
214            let connection_catalog =
215                session.get_connection_by_name(schema_name, &connection_name)?;
216            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
217                connection_id = Some(connection_catalog.id);
218                Some(params.clone())
219            } else {
220                return Err(RwError::from(ErrorCode::InvalidParameterValue(
221                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
222        )));
223            }
224        };
225
226        // report to telemetry
227        report_event(
228            PbTelemetryEventStage::CreateStreamJob,
229            "connection_ref",
230            0,
231            connector_name.clone(),
232            Some(object),
233            {
234                connection_params.as_ref().map(|cp| {
235                    jsonbb::json!({
236                        "connection_type": cp.connection_type().as_str_name().to_owned()
237                    })
238                })
239            },
240        );
241    }
242
243    let mut inner_secret_refs = {
244        let mut resolved_secret_refs = BTreeMap::new();
245        for (key, secret_ref) in secret_refs {
246            let (schema_name, secret_name) =
247                Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
248            let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
249            let ref_as = match secret_ref.ref_as {
250                SecretRefAsType::Text => PbRefAsType::Text,
251                SecretRefAsType::File => PbRefAsType::File,
252            };
253            let pb_secret_ref = PbSecretRef {
254                secret_id: secret_catalog.id.secret_id(),
255                ref_as: ref_as.into(),
256            };
257            resolved_secret_refs.insert(key.clone(), pb_secret_ref);
258        }
259        resolved_secret_refs
260    };
261
262    let mut connection_type = PbConnectionType::Unspecified;
263    let connection_params_is_none_flag = connection_params.is_none();
264
265    if let Some(connection_params) = connection_params {
266        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
267        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
268        //
269        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
270        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
271        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
272
273        if let Some(broker_rewrite_map) = connection_params
274            .get_properties()
275            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
276        {
277            if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
278                || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
279            {
280                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
281                    "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
282                    broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
283                ))));
284            }
285        }
286
287        connection_type = connection_params.connection_type();
288        for (k, v) in connection_params.properties {
289            if options.insert(k.clone(), v).is_some() {
290                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
291                    "Duplicated key in both WITH clause and Connection catalog: {}",
292                    k
293                ))));
294            }
295        }
296
297        for (k, v) in connection_params.secret_refs {
298            if inner_secret_refs.insert(k.clone(), v).is_some() {
299                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
300                    "Duplicated key in both WITH clause and Connection catalog: {}",
301                    k
302                ))));
303            }
304        }
305
306        {
307            // check if not mess up with schema registry connection and glue connection
308            if connection_type == PbConnectionType::SchemaRegistry {
309                // Check no AWS related options when using schema registry connection
310                if options
311                    .keys()
312                    .chain(inner_secret_refs.keys())
313                    .any(|k| k.starts_with("aws"))
314                {
315                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
316                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
317                        )));
318                }
319            }
320        }
321    }
322
323    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
324    if connection_params_is_none_flag {
325        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
326    }
327    Ok((
328        WithOptionsSecResolved::new(options, inner_secret_refs),
329        connection_type,
330        connection_id,
331    ))
332}
333
334/// Get the secret id from the name.
335pub(crate) fn resolve_secret_ref_in_with_options(
336    with_options: WithOptions,
337    session: &SessionImpl,
338) -> RwResult<WithOptionsSecResolved> {
339    let (options, secret_refs, _) = with_options.into_parts();
340    let mut resolved_secret_refs = BTreeMap::new();
341    let db_name: &str = &session.database();
342    for (key, secret_ref) in secret_refs {
343        let (schema_name, secret_name) =
344            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
345        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
346        let ref_as = match secret_ref.ref_as {
347            SecretRefAsType::Text => PbRefAsType::Text,
348            SecretRefAsType::File => PbRefAsType::File,
349        };
350        let pb_secret_ref = PbSecretRef {
351            secret_id: secret_catalog.id.secret_id(),
352            ref_as: ref_as.into(),
353        };
354        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
355    }
356    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
357}
358
359pub(crate) fn resolve_privatelink_in_with_option(
360    with_options: &mut WithOptions,
361) -> RwResult<Option<ConnectionId>> {
362    let is_kafka = with_options.is_kafka_connector();
363    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
364
365    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
366    if let Some(endpoint) = privatelink_endpoint {
367        if !is_kafka {
368            return Err(RwError::from(ErrorCode::ProtocolError(
369                "Privatelink is only supported in kafka connector".to_owned(),
370            )));
371        }
372        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
373            .map_err(RwError::from)?;
374    }
375    Ok(None)
376}
377
378impl TryFrom<&[SqlOption]> for WithOptions {
379    type Error = RwError;
380
381    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
382        let mut inner: BTreeMap<String, String> = BTreeMap::new();
383        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
384        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
385        for option in options {
386            let key = option.name.real_value();
387            match &option.value {
388                SqlOptionValue::SecretRef(r) => {
389                    if secret_ref.insert(key.clone(), r.clone()).is_some()
390                        || inner.contains_key(&key)
391                    {
392                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
393                            "Duplicated option: {}",
394                            key
395                        ))));
396                    }
397                    continue;
398                }
399                SqlOptionValue::ConnectionRef(r) => {
400                    if connection_ref.insert(key.clone(), r.clone()).is_some()
401                        || inner.contains_key(&key)
402                    {
403                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
404                            "Duplicated option: {}",
405                            key
406                        ))));
407                    }
408                    continue;
409                }
410                _ => {}
411            }
412            let value: String = match option.value.clone() {
413                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
414                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
415                SqlOptionValue::Value(Value::Number(n)) => n,
416                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
417                _ => {
418                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
419                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
420                            .to_owned(),
421                    )))
422                }
423            };
424            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
425                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
426                    "Duplicated option: {}",
427                    key
428                ))));
429            }
430        }
431
432        Ok(Self {
433            inner,
434            secret_ref,
435            connection_ref,
436        })
437    }
438}
439
440impl TryFrom<&Statement> for WithOptions {
441    type Error = RwError;
442
443    /// Extract options from the `WITH` clause from the given statement.
444    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
445        match statement {
446            // Explain: forward to the inner statement.
447            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
448
449            // View
450            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
451
452            // Sink
453            Statement::CreateSink {
454                stmt:
455                    CreateSinkStatement {
456                        with_properties, ..
457                    },
458            }
459            | Statement::CreateConnection {
460                stmt:
461                    CreateConnectionStatement {
462                        with_properties, ..
463                    },
464            } => Self::try_from(with_properties.0.as_slice()),
465            Statement::CreateSource {
466                stmt:
467                    CreateSourceStatement {
468                        with_properties, ..
469                    },
470                ..
471            } => Self::try_from(with_properties.0.as_slice()),
472            Statement::CreateSubscription {
473                stmt:
474                    CreateSubscriptionStatement {
475                        with_properties, ..
476                    },
477                ..
478            } => Self::try_from(with_properties.0.as_slice()),
479            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
480
481            _ => Ok(Default::default()),
482        }
483    }
484}