risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
34    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
35    SqlOptionValue, Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46    pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49/// Options or properties extracted from the `WITH` clause of DDLs.
50#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52    inner: BTreeMap<String, String>,
53    secret_ref: BTreeMap<String, SecretRefValue>,
54    connection_ref: BTreeMap<String, ConnectionRefValue>,
55    backfill_order_strategy: BackfillOrderStrategy,
56}
57
58impl GetKeyIter for WithOptions {
59    fn key_iter(&self) -> impl Iterator<Item = &str> {
60        self.inner.keys().map(|s| s.as_str())
61    }
62}
63
64impl Get for WithOptions {
65    fn get(&self, key: &str) -> Option<&String> {
66        self.inner.get(key)
67    }
68}
69
70impl std::ops::Deref for WithOptions {
71    type Target = BTreeMap<String, String>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.inner
75    }
76}
77
78impl std::ops::DerefMut for WithOptions {
79    fn deref_mut(&mut self) -> &mut Self::Target {
80        &mut self.inner
81    }
82}
83
84impl WithOptions {
85    /// Create a new [`WithOptions`] from a [`BTreeMap`].
86    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
87        Self {
88            inner,
89            secret_ref: Default::default(),
90            connection_ref: Default::default(),
91            backfill_order_strategy: Default::default(),
92        }
93    }
94
95    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
96    pub fn new(
97        inner: BTreeMap<String, String>,
98        secret_ref: BTreeMap<String, SecretRefValue>,
99        connection_ref: BTreeMap<String, ConnectionRefValue>,
100    ) -> Self {
101        Self {
102            inner,
103            secret_ref,
104            connection_ref,
105            backfill_order_strategy: Default::default(),
106        }
107    }
108
109    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
110        &mut self.inner
111    }
112
113    /// Take the value of the option map and secret refs.
114    pub fn into_parts(
115        self,
116    ) -> (
117        BTreeMap<String, String>,
118        BTreeMap<String, SecretRefValue>,
119        BTreeMap<String, ConnectionRefValue>,
120    ) {
121        (self.inner, self.secret_ref, self.connection_ref)
122    }
123
124    /// Convert to connector props, remove the key-value pairs used in the top-level.
125    pub fn into_connector_props(self) -> WithOptions {
126        let inner = self
127            .inner
128            .into_iter()
129            .filter(|(key, _)| {
130                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
131            })
132            .collect();
133
134        Self {
135            inner,
136            secret_ref: self.secret_ref,
137            connection_ref: self.connection_ref,
138            backfill_order_strategy: self.backfill_order_strategy,
139        }
140    }
141
142    /// Parse the retention seconds from the options.
143    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
144        self.inner
145            .get(options::RETENTION_SECONDS)
146            .and_then(|s| s.parse().ok())
147    }
148
149    /// Get a subset of the options from the given keys.
150    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
151        let inner = keys
152            .into_iter()
153            .filter_map(|k| {
154                self.inner
155                    .get_key_value(k.as_ref())
156                    .map(|(k, v)| (k.clone(), v.clone()))
157            })
158            .collect();
159
160        Self {
161            inner,
162            secret_ref: self.secret_ref.clone(),
163            connection_ref: self.connection_ref.clone(),
164            backfill_order_strategy: self.backfill_order_strategy.clone(),
165        }
166    }
167
168    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
169        if let Some(inner_val) = self.inner.get(key)
170            && inner_val.eq_ignore_ascii_case(val)
171        {
172            return true;
173        }
174        false
175    }
176
177    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
178        &self.secret_ref
179    }
180
181    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
182        &mut self.secret_ref
183    }
184
185    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
186        &self.connection_ref
187    }
188
189    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
190        &mut self.connection_ref
191    }
192
193    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
194        let WithOptions {
195            inner, secret_ref, ..
196        } = WithOptions::try_from(sql_options)?;
197        if secret_ref.is_empty() {
198            Ok(inner)
199        } else {
200            Err(RwError::from(ErrorCode::InvalidParameterValue(
201                "Secret reference is not allowed in OAuth options".to_owned(),
202            )))
203        }
204    }
205
206    pub fn is_source_connector(&self) -> bool {
207        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
208            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
209    }
210
211    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
212        self.backfill_order_strategy.clone()
213    }
214}
215
216/// Resolves connection references and secret references in `WithOptions`.
217///
218/// This function takes `WithOptions` (typically from a CREATE/ALTER statement),
219/// resolves any CONNECTION and SECRET references, and merges their properties
220/// with the directly specified options.
221///
222/// # Arguments
223/// * `with_options` - The original `WithOptions` containing user-specified options and references
224/// * `session` - The current user session, used to access catalogs and verify permissions
225/// * `object` - Optional telemetry information about the database object being created/altered
226///
227/// # Returns
228/// A tuple containing:
229/// * `WithOptionsSecResolved` - `WithOptions` with all references resolved and merged
230/// * `PbConnectionType` - The type of the referenced connection (if any)
231/// * `Option<u32>` - The ID of the referenced connection (if any)
232///
233/// # Workflow
234/// 1. Extract connector name, options, secret refs, and connection refs from `with_options`
235/// 2. Resolve any CONNECTION references by looking them up in the catalog
236/// 3. Resolve any SECRET references by looking them up in the catalog
237/// 4. Merge properties from CONNECTION with directly specified options
238/// 5. Perform validation to ensure no conflicts between options
239/// 6. Return the merged options, connection type, and connection ID
240pub(crate) fn resolve_connection_ref_and_secret_ref(
241    with_options: WithOptions,
242    session: &SessionImpl,
243    object: Option<TelemetryDatabaseObject>,
244) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
245    let connector_name = with_options.get_connector();
246    let db_name: &str = &session.database();
247    let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
248
249    let mut connection_id = None;
250    let mut connection_params = None;
251    for connection_ref in connection_refs.values() {
252        // at most one connection ref in the map
253        connection_params = {
254            // get connection params from catalog
255            let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
256                db_name,
257                connection_ref.connection_name.clone(),
258            )?;
259            let connection_catalog =
260                session.get_connection_by_name(schema_name, &connection_name)?;
261            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
262                connection_id = Some(connection_catalog.id);
263                Some(params.clone())
264            } else {
265                return Err(RwError::from(ErrorCode::InvalidParameterValue(
266                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
267                )));
268            }
269        };
270
271        // Report telemetry event for connection reference if an object is provided
272        if let Some(object) = object {
273            // report to telemetry
274            report_event(
275                PbTelemetryEventStage::CreateStreamJob,
276                "connection_ref",
277                0,
278                connector_name.clone(),
279                Some(object),
280                {
281                    connection_params.as_ref().map(|cp| {
282                        jsonbb::json!({
283                            "connection_type": cp.connection_type().as_str_name().to_owned()
284                        })
285                    })
286                },
287            );
288        }
289    }
290
291    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
292
293    // Initialize connection_type to Unspecified (will be updated if connection is used)
294    let mut connection_type = PbConnectionType::Unspecified;
295    let connection_params_is_none_flag = connection_params.is_none();
296
297    // If we have a connection, merge its properties with directly specified options
298    if let Some(connection_params) = connection_params {
299        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
300        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
301        //
302        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
303        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
304        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
305
306        if let Some(broker_rewrite_map) = connection_params
307            .get_properties()
308            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
309            && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
310                || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
311        {
312            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
313                "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
314                broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
315            ))));
316        }
317
318        // Extract connection type and merge properties
319        connection_type = connection_params.connection_type();
320        for (k, v) in connection_params.properties {
321            if options.insert(k.clone(), v).is_some() {
322                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
323                    "Duplicated key in both WITH clause and Connection catalog: {}",
324                    k
325                ))));
326            }
327        }
328
329        // Merge secret references from connection
330        for (k, v) in connection_params.secret_refs {
331            if inner_secret_refs.insert(k.clone(), v).is_some() {
332                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
333                    "Duplicated key in both WITH clause and Connection catalog: {}",
334                    k
335                ))));
336            }
337        }
338
339        // Schema Registry specific validation
340        {
341            if connection_type == PbConnectionType::SchemaRegistry {
342                // Ensure no AWS/Glue options are provided when using schema registry connection
343                if options
344                    .keys()
345                    .chain(inner_secret_refs.keys())
346                    .any(|k| k.starts_with("aws"))
347                {
348                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
349                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
350                        )));
351                }
352            }
353        }
354    }
355
356    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
357    if connection_params_is_none_flag {
358        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
359    }
360    Ok((
361        WithOptionsSecResolved::new(options, inner_secret_refs),
362        connection_type,
363        connection_id,
364    ))
365}
366
367/// Get the secret id from the name.
368pub(crate) fn resolve_secret_ref_in_with_options(
369    with_options: WithOptions,
370    session: &SessionImpl,
371) -> RwResult<WithOptionsSecResolved> {
372    let (options, secret_refs, _) = with_options.into_parts();
373    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
374    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
375}
376
377fn resolve_secret_refs_inner(
378    secret_refs: BTreeMap<String, SecretRefValue>,
379    session: &SessionImpl,
380) -> RwResult<BTreeMap<String, PbSecretRef>> {
381    let db_name: &str = &session.database();
382    let mut resolved_secret_refs = BTreeMap::new();
383    for (key, secret_ref) in secret_refs {
384        let (schema_name, secret_name) =
385            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
386        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
387        let ref_as = match secret_ref.ref_as {
388            SecretRefAsType::Text => PbRefAsType::Text,
389            SecretRefAsType::File => PbRefAsType::File,
390        };
391        let pb_secret_ref = PbSecretRef {
392            secret_id: secret_catalog.id.secret_id(),
393            ref_as: ref_as.into(),
394        };
395        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
396    }
397    Ok(resolved_secret_refs)
398}
399
400pub(crate) fn resolve_privatelink_in_with_option(
401    with_options: &mut WithOptions,
402) -> RwResult<Option<ConnectionId>> {
403    let is_kafka = with_options.is_kafka_connector();
404    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
405
406    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
407    if let Some(endpoint) = privatelink_endpoint {
408        if !is_kafka {
409            return Err(RwError::from(ErrorCode::ProtocolError(
410                "Privatelink is only supported in kafka connector".to_owned(),
411            )));
412        }
413        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
414            .map_err(RwError::from)?;
415    }
416    Ok(None)
417}
418
419impl TryFrom<&[SqlOption]> for WithOptions {
420    type Error = RwError;
421
422    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
423        let mut inner: BTreeMap<String, String> = BTreeMap::new();
424        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
425        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
426        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
427        for option in options {
428            let key = option.name.real_value();
429            match &option.value {
430                SqlOptionValue::SecretRef(r) => {
431                    if secret_ref.insert(key.clone(), r.clone()).is_some()
432                        || inner.contains_key(&key)
433                    {
434                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
435                            "Duplicated option: {}",
436                            key
437                        ))));
438                    }
439                    continue;
440                }
441                SqlOptionValue::ConnectionRef(r) => {
442                    if connection_ref.insert(key.clone(), r.clone()).is_some()
443                        || inner.contains_key(&key)
444                    {
445                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
446                            "Duplicated option: {}",
447                            key
448                        ))));
449                    }
450                    continue;
451                }
452                SqlOptionValue::BackfillOrder(b) => {
453                    backfill_order_strategy = b.clone();
454                    continue;
455                }
456                _ => {}
457            }
458            let value: String = match option.value.clone() {
459                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
460                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
461                SqlOptionValue::Value(Value::Number(n)) => n,
462                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
463                _ => {
464                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
465                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
466                            .to_owned(),
467                    )))
468                }
469            };
470            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
471                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
472                    "Duplicated option: {}",
473                    key
474                ))));
475            }
476        }
477
478        Ok(Self {
479            inner,
480            secret_ref,
481            connection_ref,
482            backfill_order_strategy,
483        })
484    }
485}
486
487impl TryFrom<&Statement> for WithOptions {
488    type Error = RwError;
489
490    /// Extract options from the `WITH` clause from the given statement.
491    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
492        match statement {
493            // Explain: forward to the inner statement.
494            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
495
496            // View
497            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
498
499            // Sink
500            Statement::CreateSink {
501                stmt:
502                    CreateSinkStatement {
503                        with_properties, ..
504                    },
505            }
506            | Statement::CreateConnection {
507                stmt:
508                    CreateConnectionStatement {
509                        with_properties, ..
510                    },
511            } => Self::try_from(with_properties.0.as_slice()),
512            Statement::CreateSource {
513                stmt:
514                    CreateSourceStatement {
515                        with_properties, ..
516                    },
517                ..
518            } => Self::try_from(with_properties.0.as_slice()),
519            Statement::CreateSubscription {
520                stmt:
521                    CreateSubscriptionStatement {
522                        with_properties, ..
523                    },
524                ..
525            } => Self::try_from(with_properties.0.as_slice()),
526            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
527
528            _ => Ok(Default::default()),
529        }
530    }
531}