risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
34    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
35    SqlOptionValue, Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46    pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49/// Options or properties extracted from the `WITH` clause of DDLs.
50#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52    inner: BTreeMap<String, String>,
53    secret_ref: BTreeMap<String, SecretRefValue>,
54    connection_ref: BTreeMap<String, ConnectionRefValue>,
55    backfill_order_strategy: BackfillOrderStrategy,
56}
57
58impl GetKeyIter for WithOptions {
59    fn key_iter(&self) -> impl Iterator<Item = &str> {
60        self.inner.keys().map(|s| s.as_str())
61    }
62}
63
64impl Get for WithOptions {
65    fn get(&self, key: &str) -> Option<&String> {
66        self.inner.get(key)
67    }
68}
69
70impl std::ops::Deref for WithOptions {
71    type Target = BTreeMap<String, String>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.inner
75    }
76}
77
78impl std::ops::DerefMut for WithOptions {
79    fn deref_mut(&mut self) -> &mut Self::Target {
80        &mut self.inner
81    }
82}
83
84impl WithOptions {
85    /// Create a new [`WithOptions`] from a [`BTreeMap`].
86    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
87        Self {
88            inner,
89            secret_ref: Default::default(),
90            connection_ref: Default::default(),
91            backfill_order_strategy: Default::default(),
92        }
93    }
94
95    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
96    pub fn new(
97        inner: BTreeMap<String, String>,
98        secret_ref: BTreeMap<String, SecretRefValue>,
99        connection_ref: BTreeMap<String, ConnectionRefValue>,
100    ) -> Self {
101        Self {
102            inner,
103            secret_ref,
104            connection_ref,
105            backfill_order_strategy: Default::default(),
106        }
107    }
108
109    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
110        &mut self.inner
111    }
112
113    /// Take the value of the option map and secret refs.
114    pub fn into_parts(
115        self,
116    ) -> (
117        BTreeMap<String, String>,
118        BTreeMap<String, SecretRefValue>,
119        BTreeMap<String, ConnectionRefValue>,
120    ) {
121        (self.inner, self.secret_ref, self.connection_ref)
122    }
123
124    /// Convert to connector props, remove the key-value pairs used in the top-level.
125    pub fn into_connector_props(self) -> WithOptions {
126        let inner = self
127            .inner
128            .into_iter()
129            .filter(|(key, _)| {
130                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
131            })
132            .collect();
133
134        Self {
135            inner,
136            secret_ref: self.secret_ref,
137            connection_ref: self.connection_ref,
138            backfill_order_strategy: self.backfill_order_strategy,
139        }
140    }
141
142    /// Parse the retention seconds from the options.
143    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
144        self.inner
145            .get(options::RETENTION_SECONDS)
146            .and_then(|s| s.parse().ok())
147    }
148
149    /// Get a subset of the options from the given keys.
150    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
151        let inner = keys
152            .into_iter()
153            .filter_map(|k| {
154                self.inner
155                    .get_key_value(k.as_ref())
156                    .map(|(k, v)| (k.clone(), v.clone()))
157            })
158            .collect();
159
160        Self {
161            inner,
162            secret_ref: self.secret_ref.clone(),
163            connection_ref: self.connection_ref.clone(),
164            backfill_order_strategy: self.backfill_order_strategy.clone(),
165        }
166    }
167
168    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
169        if let Some(inner_val) = self.inner.get(key)
170            && inner_val.eq_ignore_ascii_case(val)
171        {
172            return true;
173        }
174        false
175    }
176
177    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
178        &self.secret_ref
179    }
180
181    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
182        &mut self.secret_ref
183    }
184
185    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
186        &self.connection_ref
187    }
188
189    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
190        &mut self.connection_ref
191    }
192
193    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
194        let WithOptions {
195            inner, secret_ref, ..
196        } = WithOptions::try_from(sql_options)?;
197        if secret_ref.is_empty() {
198            Ok(inner)
199        } else {
200            Err(RwError::from(ErrorCode::InvalidParameterValue(
201                "Secret reference is not allowed in OAuth options".to_owned(),
202            )))
203        }
204    }
205
206    pub fn is_source_connector(&self) -> bool {
207        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
208            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
209    }
210
211    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
212        self.backfill_order_strategy.clone()
213    }
214}
215
216/// Resolves connection references and secret references in `WithOptions`.
217///
218/// This function takes `WithOptions` (typically from a CREATE/ALTER statement),
219/// resolves any CONNECTION and SECRET references, and merges their properties
220/// with the directly specified options.
221///
222/// # Arguments
223/// * `with_options` - The original `WithOptions` containing user-specified options and references
224/// * `session` - The current user session, used to access catalogs and verify permissions
225/// * `object` - Optional telemetry information about the database object being created/altered
226///
227/// # Returns
228/// A tuple containing:
229/// * `WithOptionsSecResolved` - `WithOptions` with all references resolved and merged
230/// * `PbConnectionType` - The type of the referenced connection (if any)
231/// * `Option<u32>` - The ID of the referenced connection (if any)
232///
233/// # Workflow
234/// 1. Extract connector name, options, secret refs, and connection refs from `with_options`
235/// 2. Resolve any CONNECTION references by looking them up in the catalog
236/// 3. Resolve any SECRET references by looking them up in the catalog
237/// 4. Merge properties from CONNECTION with directly specified options
238/// 5. Perform validation to ensure no conflicts between options
239/// 6. Return the merged options, connection type, and connection ID
240pub(crate) fn resolve_connection_ref_and_secret_ref(
241    with_options: WithOptions,
242    session: &SessionImpl,
243    object: Option<TelemetryDatabaseObject>,
244) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
245    let connector_name = with_options.get_connector();
246    let db_name: &str = &session.database();
247    let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
248
249    let mut connection_id = None;
250    let mut connection_params = None;
251    for connection_ref in connection_refs.values() {
252        // at most one connection ref in the map
253        connection_params = {
254            // get connection params from catalog
255            let (schema_name, connection_name) =
256                Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
257            let connection_catalog =
258                session.get_connection_by_name(schema_name, &connection_name)?;
259            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
260                connection_id = Some(connection_catalog.id);
261                Some(params.clone())
262            } else {
263                return Err(RwError::from(ErrorCode::InvalidParameterValue(
264                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
265                )));
266            }
267        };
268
269        // Report telemetry event for connection reference if an object is provided
270        if let Some(object) = object {
271            // report to telemetry
272            report_event(
273                PbTelemetryEventStage::CreateStreamJob,
274                "connection_ref",
275                0,
276                connector_name.clone(),
277                Some(object),
278                {
279                    connection_params.as_ref().map(|cp| {
280                        jsonbb::json!({
281                            "connection_type": cp.connection_type().as_str_name().to_owned()
282                        })
283                    })
284                },
285            );
286        }
287    }
288
289    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
290
291    // Initialize connection_type to Unspecified (will be updated if connection is used)
292    let mut connection_type = PbConnectionType::Unspecified;
293    let connection_params_is_none_flag = connection_params.is_none();
294
295    // If we have a connection, merge its properties with directly specified options
296    if let Some(connection_params) = connection_params {
297        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
298        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
299        //
300        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
301        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
302        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
303
304        if let Some(broker_rewrite_map) = connection_params
305            .get_properties()
306            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
307            && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
308                || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
309        {
310            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
311                "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
312                broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
313            ))));
314        }
315
316        // Extract connection type and merge properties
317        connection_type = connection_params.connection_type();
318        for (k, v) in connection_params.properties {
319            if options.insert(k.clone(), v).is_some() {
320                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
321                    "Duplicated key in both WITH clause and Connection catalog: {}",
322                    k
323                ))));
324            }
325        }
326
327        // Merge secret references from connection
328        for (k, v) in connection_params.secret_refs {
329            if inner_secret_refs.insert(k.clone(), v).is_some() {
330                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
331                    "Duplicated key in both WITH clause and Connection catalog: {}",
332                    k
333                ))));
334            }
335        }
336
337        // Schema Registry specific validation
338        {
339            if connection_type == PbConnectionType::SchemaRegistry {
340                // Ensure no AWS/Glue options are provided when using schema registry connection
341                if options
342                    .keys()
343                    .chain(inner_secret_refs.keys())
344                    .any(|k| k.starts_with("aws"))
345                {
346                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
347                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
348                        )));
349                }
350            }
351        }
352    }
353
354    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
355    if connection_params_is_none_flag {
356        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
357    }
358    Ok((
359        WithOptionsSecResolved::new(options, inner_secret_refs),
360        connection_type,
361        connection_id,
362    ))
363}
364
365/// Get the secret id from the name.
366pub(crate) fn resolve_secret_ref_in_with_options(
367    with_options: WithOptions,
368    session: &SessionImpl,
369) -> RwResult<WithOptionsSecResolved> {
370    let (options, secret_refs, _) = with_options.into_parts();
371    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
372    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
373}
374
375fn resolve_secret_refs_inner(
376    secret_refs: BTreeMap<String, SecretRefValue>,
377    session: &SessionImpl,
378) -> RwResult<BTreeMap<String, PbSecretRef>> {
379    let db_name: &str = &session.database();
380    let mut resolved_secret_refs = BTreeMap::new();
381    for (key, secret_ref) in secret_refs {
382        let (schema_name, secret_name) =
383            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
384        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
385        let ref_as = match secret_ref.ref_as {
386            SecretRefAsType::Text => PbRefAsType::Text,
387            SecretRefAsType::File => PbRefAsType::File,
388        };
389        let pb_secret_ref = PbSecretRef {
390            secret_id: secret_catalog.id.secret_id(),
391            ref_as: ref_as.into(),
392        };
393        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
394    }
395    Ok(resolved_secret_refs)
396}
397
398pub(crate) fn resolve_privatelink_in_with_option(
399    with_options: &mut WithOptions,
400) -> RwResult<Option<ConnectionId>> {
401    let is_kafka = with_options.is_kafka_connector();
402    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
403
404    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
405    if let Some(endpoint) = privatelink_endpoint {
406        if !is_kafka {
407            return Err(RwError::from(ErrorCode::ProtocolError(
408                "Privatelink is only supported in kafka connector".to_owned(),
409            )));
410        }
411        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
412            .map_err(RwError::from)?;
413    }
414    Ok(None)
415}
416
417impl TryFrom<&[SqlOption]> for WithOptions {
418    type Error = RwError;
419
420    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
421        let mut inner: BTreeMap<String, String> = BTreeMap::new();
422        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
423        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
424        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
425        for option in options {
426            let key = option.name.real_value();
427            match &option.value {
428                SqlOptionValue::SecretRef(r) => {
429                    if secret_ref.insert(key.clone(), r.clone()).is_some()
430                        || inner.contains_key(&key)
431                    {
432                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
433                            "Duplicated option: {}",
434                            key
435                        ))));
436                    }
437                    continue;
438                }
439                SqlOptionValue::ConnectionRef(r) => {
440                    if connection_ref.insert(key.clone(), r.clone()).is_some()
441                        || inner.contains_key(&key)
442                    {
443                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
444                            "Duplicated option: {}",
445                            key
446                        ))));
447                    }
448                    continue;
449                }
450                SqlOptionValue::BackfillOrder(b) => {
451                    backfill_order_strategy = b.clone();
452                    continue;
453                }
454                _ => {}
455            }
456            let value: String = match option.value.clone() {
457                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
458                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
459                SqlOptionValue::Value(Value::Number(n)) => n,
460                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
461                _ => {
462                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
463                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
464                            .to_owned(),
465                    )))
466                }
467            };
468            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
469                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
470                    "Duplicated option: {}",
471                    key
472                ))));
473            }
474        }
475
476        Ok(Self {
477            inner,
478            secret_ref,
479            connection_ref,
480            backfill_order_strategy,
481        })
482    }
483}
484
485impl TryFrom<&Statement> for WithOptions {
486    type Error = RwError;
487
488    /// Extract options from the `WITH` clause from the given statement.
489    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
490        match statement {
491            // Explain: forward to the inner statement.
492            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
493
494            // View
495            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
496
497            // Sink
498            Statement::CreateSink {
499                stmt:
500                    CreateSinkStatement {
501                        with_properties, ..
502                    },
503            }
504            | Statement::CreateConnection {
505                stmt:
506                    CreateConnectionStatement {
507                        with_properties, ..
508                    },
509            } => Self::try_from(with_properties.0.as_slice()),
510            Statement::CreateSource {
511                stmt:
512                    CreateSourceStatement {
513                        with_properties, ..
514                    },
515                ..
516            } => Self::try_from(with_properties.0.as_slice()),
517            Statement::CreateSubscription {
518                stmt:
519                    CreateSubscriptionStatement {
520                        with_properties, ..
521                    },
522                ..
523            } => Self::try_from(with_properties.0.as_slice()),
524            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
525
526            _ => Ok(Default::default()),
527        }
528    }
529}