risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39    SqlOptionValue, Statement, Value,
40};
41use thiserror_ext::AsReport;
42
43use super::OverwriteOptions;
44use crate::Binder;
45use crate::error::{ErrorCode, Result as RwResult, RwError};
46use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
47use crate::session::SessionImpl;
48use crate::telemetry::report_event;
49
50pub mod options {
51    pub const RETENTION_SECONDS: &str = "retention_seconds";
52}
53
54pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
55pub const SOURCE_REFRESH_INTERVAL_SEC_KEY: &str = "refresh_interval_sec";
56
57/// Options or properties extracted from the `WITH` clause of DDLs.
58#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
59pub struct WithOptions {
60    inner: BTreeMap<String, String>,
61    secret_ref: BTreeMap<String, SecretRefValue>,
62    connection_ref: BTreeMap<String, ConnectionRefValue>,
63    backfill_order_strategy: BackfillOrderStrategy,
64}
65
66impl GetKeyIter for WithOptions {
67    fn key_iter(&self) -> impl Iterator<Item = &str> {
68        self.inner.keys().map(|s| s.as_str())
69    }
70}
71
72impl Get for WithOptions {
73    fn get(&self, key: &str) -> Option<&String> {
74        self.inner.get(key)
75    }
76}
77
78impl std::ops::Deref for WithOptions {
79    type Target = BTreeMap<String, String>;
80
81    fn deref(&self) -> &Self::Target {
82        &self.inner
83    }
84}
85
86impl std::ops::DerefMut for WithOptions {
87    fn deref_mut(&mut self) -> &mut Self::Target {
88        &mut self.inner
89    }
90}
91
92impl WithOptions {
93    /// Create a new [`WithOptions`] from a [`BTreeMap`].
94    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
95        Self {
96            inner,
97            secret_ref: Default::default(),
98            connection_ref: Default::default(),
99            backfill_order_strategy: Default::default(),
100        }
101    }
102
103    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
104    pub fn new(
105        inner: BTreeMap<String, String>,
106        secret_ref: BTreeMap<String, SecretRefValue>,
107        connection_ref: BTreeMap<String, ConnectionRefValue>,
108    ) -> Self {
109        Self {
110            inner,
111            secret_ref,
112            connection_ref,
113            backfill_order_strategy: Default::default(),
114        }
115    }
116
117    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
118        &mut self.inner
119    }
120
121    /// Take the value of the option map and secret refs.
122    pub fn into_parts(
123        self,
124    ) -> (
125        BTreeMap<String, String>,
126        BTreeMap<String, SecretRefValue>,
127        BTreeMap<String, ConnectionRefValue>,
128    ) {
129        (self.inner, self.secret_ref, self.connection_ref)
130    }
131
132    /// Convert to connector props, remove the key-value pairs used in the top-level.
133    pub fn into_connector_props(self) -> WithOptions {
134        let inner = self
135            .inner
136            .into_iter()
137            .filter(|(key, _)| {
138                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
139            })
140            .collect();
141
142        Self {
143            inner,
144            secret_ref: self.secret_ref,
145            connection_ref: self.connection_ref,
146            backfill_order_strategy: self.backfill_order_strategy,
147        }
148    }
149
150    /// Parse the retention seconds from the options.
151    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
152        self.inner
153            .get(options::RETENTION_SECONDS)
154            .and_then(|s| s.parse().ok())
155    }
156
157    /// Get a subset of the options from the given keys.
158    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
159        let inner = keys
160            .into_iter()
161            .filter_map(|k| {
162                self.inner
163                    .get_key_value(k.as_ref())
164                    .map(|(k, v)| (k.clone(), v.clone()))
165            })
166            .collect();
167
168        Self {
169            inner,
170            secret_ref: self.secret_ref.clone(),
171            connection_ref: self.connection_ref.clone(),
172            backfill_order_strategy: self.backfill_order_strategy.clone(),
173        }
174    }
175
176    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
177        if let Some(inner_val) = self.inner.get(key)
178            && inner_val.eq_ignore_ascii_case(val)
179        {
180            return true;
181        }
182        false
183    }
184
185    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
186        &self.secret_ref
187    }
188
189    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
190        &mut self.secret_ref
191    }
192
193    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
194        &self.connection_ref
195    }
196
197    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
198        &mut self.connection_ref
199    }
200
201    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
202        let WithOptions {
203            inner, secret_ref, ..
204        } = WithOptions::try_from(sql_options)?;
205        if secret_ref.is_empty() {
206            Ok(inner)
207        } else {
208            Err(RwError::from(ErrorCode::InvalidParameterValue(
209                "Secret reference is not allowed in OAuth options".to_owned(),
210            )))
211        }
212    }
213
214    pub fn is_source_connector(&self) -> bool {
215        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
216            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
217    }
218
219    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
220        self.backfill_order_strategy.clone()
221    }
222}
223
224/// Resolves connection references and secret references in `WithOptions`.
225///
226/// This function takes `WithOptions` (typically from a CREATE/ALTER statement),
227/// resolves any CONNECTION and SECRET references, and merges their properties
228/// with the directly specified options.
229///
230/// # Arguments
231/// * `with_options` - The original `WithOptions` containing user-specified options and references
232/// * `session` - The current user session, used to access catalogs and verify permissions
233/// * `object` - Optional telemetry information about the database object being created/altered
234///
235/// # Returns
236/// A tuple containing:
237/// * `WithOptionsSecResolved` - `WithOptions` with all references resolved and merged
238/// * `PbConnectionType` - The type of the referenced connection (if any)
239/// * `Option<u32>` - The ID of the referenced connection (if any)
240///
241/// # Workflow
242/// 1. Extract connector name, options, secret refs, and connection refs from `with_options`
243/// 2. Resolve any CONNECTION references by looking them up in the catalog
244/// 3. Resolve any SECRET references by looking them up in the catalog
245/// 4. Merge properties from CONNECTION with directly specified options
246/// 5. Perform validation to ensure no conflicts between options
247/// 6. Return the merged options, connection type, and connection ID
248pub(crate) fn resolve_connection_ref_and_secret_ref(
249    with_options: WithOptions,
250    session: &SessionImpl,
251    object: Option<TelemetryDatabaseObject>,
252) -> RwResult<(
253    WithOptionsSecResolved,
254    PbConnectionType,
255    Option<ConnectionId>,
256)> {
257    let connector_name = with_options.get_connector();
258    let db_name: &str = &session.database();
259    let (mut options, secret_refs, connection_refs) = with_options.into_parts();
260
261    let mut connection_id = None;
262    let mut connection_params = None;
263    for connection_ref in connection_refs.values() {
264        // at most one connection ref in the map
265        connection_params = {
266            // get connection params from catalog
267            let (schema_name, connection_name) =
268                Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
269            let connection_catalog =
270                session.get_connection_by_name(schema_name, &connection_name)?;
271            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
272                connection_id = Some(connection_catalog.id);
273                Some(params.clone())
274            } else {
275                return Err(RwError::from(ErrorCode::InvalidParameterValue(
276                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
277                )));
278            }
279        };
280
281        // Report telemetry event for connection reference if an object is provided
282        if let Some(object) = object {
283            // report to telemetry
284            report_event(
285                PbTelemetryEventStage::CreateStreamJob,
286                "connection_ref",
287                0,
288                connector_name.clone(),
289                Some(object),
290                {
291                    connection_params.as_ref().map(|cp| {
292                        jsonbb::json!({
293                            "connection_type": cp.connection_type().as_str_name().to_owned()
294                        })
295                    })
296                },
297            );
298        }
299    }
300
301    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
302
303    // Initialize connection_type to Unspecified (will be updated if connection is used)
304    let mut connection_type = PbConnectionType::Unspecified;
305    let connection_params_is_none_flag = connection_params.is_none();
306
307    // If we have a connection, merge its properties with directly specified options
308    if let Some(connection_params) = connection_params {
309        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
310        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
311        //
312        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
313        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
314        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
315
316        if let Some(broker_rewrite_map) = connection_params
317            .get_properties()
318            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
319            && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
320                || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
321        {
322            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
323                "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
324                broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
325            ))));
326        }
327
328        // Extract connection type and merge properties
329        connection_type = connection_params.connection_type();
330        for (k, v) in connection_params.properties {
331            if options.insert(k.clone(), v).is_some() {
332                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
333                    "Duplicated key in both WITH clause and Connection catalog: {}",
334                    k
335                ))));
336            }
337        }
338
339        // Merge secret references from connection
340        for (k, v) in connection_params.secret_refs {
341            if inner_secret_refs.insert(k.clone(), v).is_some() {
342                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
343                    "Duplicated key in both WITH clause and Connection catalog: {}",
344                    k
345                ))));
346            }
347        }
348
349        // Schema Registry specific validation
350        {
351            if connection_type == PbConnectionType::SchemaRegistry {
352                // Ensure no AWS/Glue options are provided when using schema registry connection
353                if options
354                    .keys()
355                    .chain(inner_secret_refs.keys())
356                    .any(|k| k.starts_with("aws"))
357                {
358                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
359                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
360                        )));
361                }
362            }
363        }
364    }
365
366    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
367    if connection_params_is_none_flag {
368        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
369    }
370    Ok((
371        WithOptionsSecResolved::new(options, inner_secret_refs),
372        connection_type,
373        connection_id,
374    ))
375}
376
377/// Get the secret id from the name.
378pub(crate) fn resolve_secret_ref_in_with_options(
379    with_options: WithOptions,
380    session: &SessionImpl,
381) -> RwResult<WithOptionsSecResolved> {
382    let (options, secret_refs, _) = with_options.into_parts();
383    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
384    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
385}
386
387fn resolve_secret_refs_inner(
388    secret_refs: BTreeMap<String, SecretRefValue>,
389    session: &SessionImpl,
390) -> RwResult<BTreeMap<String, PbSecretRef>> {
391    let db_name: &str = &session.database();
392    let mut resolved_secret_refs = BTreeMap::new();
393    for (key, secret_ref) in secret_refs {
394        let (schema_name, secret_name) =
395            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
396        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
397        let ref_as = match secret_ref.ref_as {
398            SecretRefAsType::Text => PbRefAsType::Text,
399            SecretRefAsType::File => PbRefAsType::File,
400        };
401        let pb_secret_ref = PbSecretRef {
402            secret_id: secret_catalog.id,
403            ref_as: ref_as.into(),
404        };
405        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
406    }
407    Ok(resolved_secret_refs)
408}
409
410pub(crate) fn resolve_source_refresh_mode_in_with_option(
411    with_options: &mut WithOptions,
412) -> RwResult<Option<SourceRefreshMode>> {
413    let source_refresh_interval_sec =
414        {
415            if let Some(maybe_int) = with_options.remove(SOURCE_REFRESH_INTERVAL_SEC_KEY) {
416                let some_int = maybe_int.parse::<i64>().map_err(|e| {
417                RwError::from(ErrorCode::InvalidParameterValue(format!(
418                    "`{}` must be a positive integer and larger than 0, but got: {} (error: {})",
419                    SOURCE_REFRESH_INTERVAL_SEC_KEY, maybe_int, e.as_report()
420                )))
421            })?;
422                if some_int <= 0 {
423                    return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
424                        "`{}` must be larger than 0, but got: {}",
425                        SOURCE_REFRESH_INTERVAL_SEC_KEY, some_int
426                    ))));
427                }
428                Some(some_int)
429            } else {
430                None
431            }
432        };
433
434    let source_refresh_mode =
435        if let Some(source_refresh_mode_str) = with_options.remove(SOURCE_REFRESH_MODE_KEY) {
436            match source_refresh_mode_str.to_uppercase().as_str() {
437                "STREAMING" => {
438                    if source_refresh_interval_sec.is_some() {
439                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
440                            "`{}` is not allowed when `{}` is 'STREAMING'",
441                            SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
442                        ))));
443                    }
444                    SourceRefreshMode {
445                        refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
446                    }
447                }
448                "FULL_RELOAD" => SourceRefreshMode {
449                    refresh_mode: Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
450                        refresh_interval_sec: source_refresh_interval_sec,
451                    })),
452                },
453                _ => {
454                    return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
455                        "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RELOAD'",
456                        SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
457                    ))));
458                }
459            }
460        } else {
461            // also check the `refresh_interval_sec` is not provided when `refresh_mode` is not provided
462            if source_refresh_interval_sec.is_some() {
463                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
464                    "`{}` is not allowed when `{}` is not 'FULL_RELOAD'",
465                    SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
466                ))));
467            }
468            return Ok(None);
469        };
470    Ok(Some(source_refresh_mode))
471}
472
473pub(crate) fn resolve_privatelink_in_with_option(
474    with_options: &mut WithOptions,
475) -> RwResult<Option<ConnectionId>> {
476    let is_kafka = with_options.is_kafka_connector();
477    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
478
479    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
480    if let Some(endpoint) = privatelink_endpoint {
481        if !is_kafka {
482            return Err(RwError::from(ErrorCode::ProtocolError(
483                "Privatelink is only supported in kafka connector".to_owned(),
484            )));
485        }
486        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
487            .map_err(RwError::from)?;
488    }
489    Ok(None)
490}
491
492impl TryFrom<&[SqlOption]> for WithOptions {
493    type Error = RwError;
494
495    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
496        let mut inner: BTreeMap<String, String> = BTreeMap::new();
497        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
498        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
499        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
500        for option in options {
501            let key = option.name.real_value();
502            match &option.value {
503                SqlOptionValue::SecretRef(r) => {
504                    if secret_ref.insert(key.clone(), r.clone()).is_some()
505                        || inner.contains_key(&key)
506                    {
507                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
508                            "Duplicated option: {}",
509                            key
510                        ))));
511                    }
512                    continue;
513                }
514                SqlOptionValue::ConnectionRef(r) => {
515                    if connection_ref.insert(key.clone(), r.clone()).is_some()
516                        || inner.contains_key(&key)
517                    {
518                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
519                            "Duplicated option: {}",
520                            key
521                        ))));
522                    }
523                    continue;
524                }
525                SqlOptionValue::BackfillOrder(b) => {
526                    backfill_order_strategy = b.clone();
527                    continue;
528                }
529                _ => {}
530            }
531            let value: String = match option.value.clone() {
532                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
533                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
534                SqlOptionValue::Value(Value::Number(n)) => n,
535                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
536                _ => {
537                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
538                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
539                            .to_owned(),
540                    )))
541                }
542            };
543            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
544                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
545                    "Duplicated option: {}",
546                    key
547                ))));
548            }
549        }
550
551        Ok(Self {
552            inner,
553            secret_ref,
554            connection_ref,
555            backfill_order_strategy,
556        })
557    }
558}
559
560impl TryFrom<&Statement> for WithOptions {
561    type Error = RwError;
562
563    /// Extract options from the `WITH` clause from the given statement.
564    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
565        match statement {
566            // Explain: forward to the inner statement.
567            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
568
569            // View
570            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
571
572            // Sink
573            Statement::CreateSink {
574                stmt:
575                    CreateSinkStatement {
576                        with_properties, ..
577                    },
578            }
579            | Statement::CreateConnection {
580                stmt:
581                    CreateConnectionStatement {
582                        with_properties, ..
583                    },
584            }
585            | Statement::CreateSource {
586                stmt:
587                    CreateSourceStatement {
588                        with_properties, ..
589                    },
590                ..
591            }
592            | Statement::CreateSubscription {
593                stmt:
594                    CreateSubscriptionStatement {
595                        with_properties, ..
596                    },
597                ..
598            }
599            | Statement::CreateIndex {
600                with_properties, ..
601            } => Self::try_from(with_properties.0.as_slice()),
602            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
603
604            _ => Ok(Default::default()),
605        }
606    }
607}