risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31    RefreshMode, SourceRefreshModeFullRecompute, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39    SqlOptionValue, Statement, Value,
40};
41
42use super::OverwriteOptions;
43use crate::Binder;
44use crate::error::{ErrorCode, Result as RwResult, RwError};
45use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
46use crate::session::SessionImpl;
47use crate::telemetry::report_event;
48
49pub mod options {
50    pub const RETENTION_SECONDS: &str = "retention_seconds";
51}
52
53pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
54
55/// Options or properties extracted from the `WITH` clause of DDLs.
56#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
57pub struct WithOptions {
58    inner: BTreeMap<String, String>,
59    secret_ref: BTreeMap<String, SecretRefValue>,
60    connection_ref: BTreeMap<String, ConnectionRefValue>,
61    backfill_order_strategy: BackfillOrderStrategy,
62}
63
64impl GetKeyIter for WithOptions {
65    fn key_iter(&self) -> impl Iterator<Item = &str> {
66        self.inner.keys().map(|s| s.as_str())
67    }
68}
69
70impl Get for WithOptions {
71    fn get(&self, key: &str) -> Option<&String> {
72        self.inner.get(key)
73    }
74}
75
76impl std::ops::Deref for WithOptions {
77    type Target = BTreeMap<String, String>;
78
79    fn deref(&self) -> &Self::Target {
80        &self.inner
81    }
82}
83
84impl std::ops::DerefMut for WithOptions {
85    fn deref_mut(&mut self) -> &mut Self::Target {
86        &mut self.inner
87    }
88}
89
90impl WithOptions {
91    /// Create a new [`WithOptions`] from a [`BTreeMap`].
92    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
93        Self {
94            inner,
95            secret_ref: Default::default(),
96            connection_ref: Default::default(),
97            backfill_order_strategy: Default::default(),
98        }
99    }
100
101    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
102    pub fn new(
103        inner: BTreeMap<String, String>,
104        secret_ref: BTreeMap<String, SecretRefValue>,
105        connection_ref: BTreeMap<String, ConnectionRefValue>,
106    ) -> Self {
107        Self {
108            inner,
109            secret_ref,
110            connection_ref,
111            backfill_order_strategy: Default::default(),
112        }
113    }
114
115    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
116        &mut self.inner
117    }
118
119    /// Take the value of the option map and secret refs.
120    pub fn into_parts(
121        self,
122    ) -> (
123        BTreeMap<String, String>,
124        BTreeMap<String, SecretRefValue>,
125        BTreeMap<String, ConnectionRefValue>,
126    ) {
127        (self.inner, self.secret_ref, self.connection_ref)
128    }
129
130    /// Convert to connector props, remove the key-value pairs used in the top-level.
131    pub fn into_connector_props(self) -> WithOptions {
132        let inner = self
133            .inner
134            .into_iter()
135            .filter(|(key, _)| {
136                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
137            })
138            .collect();
139
140        Self {
141            inner,
142            secret_ref: self.secret_ref,
143            connection_ref: self.connection_ref,
144            backfill_order_strategy: self.backfill_order_strategy,
145        }
146    }
147
148    /// Parse the retention seconds from the options.
149    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
150        self.inner
151            .get(options::RETENTION_SECONDS)
152            .and_then(|s| s.parse().ok())
153    }
154
155    /// Get a subset of the options from the given keys.
156    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
157        let inner = keys
158            .into_iter()
159            .filter_map(|k| {
160                self.inner
161                    .get_key_value(k.as_ref())
162                    .map(|(k, v)| (k.clone(), v.clone()))
163            })
164            .collect();
165
166        Self {
167            inner,
168            secret_ref: self.secret_ref.clone(),
169            connection_ref: self.connection_ref.clone(),
170            backfill_order_strategy: self.backfill_order_strategy.clone(),
171        }
172    }
173
174    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
175        if let Some(inner_val) = self.inner.get(key)
176            && inner_val.eq_ignore_ascii_case(val)
177        {
178            return true;
179        }
180        false
181    }
182
183    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
184        &self.secret_ref
185    }
186
187    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
188        &mut self.secret_ref
189    }
190
191    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
192        &self.connection_ref
193    }
194
195    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
196        &mut self.connection_ref
197    }
198
199    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
200        let WithOptions {
201            inner, secret_ref, ..
202        } = WithOptions::try_from(sql_options)?;
203        if secret_ref.is_empty() {
204            Ok(inner)
205        } else {
206            Err(RwError::from(ErrorCode::InvalidParameterValue(
207                "Secret reference is not allowed in OAuth options".to_owned(),
208            )))
209        }
210    }
211
212    pub fn is_source_connector(&self) -> bool {
213        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
214            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
215    }
216
217    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
218        self.backfill_order_strategy.clone()
219    }
220}
221
222/// Resolves connection references and secret references in `WithOptions`.
223///
224/// This function takes `WithOptions` (typically from a CREATE/ALTER statement),
225/// resolves any CONNECTION and SECRET references, and merges their properties
226/// with the directly specified options.
227///
228/// # Arguments
229/// * `with_options` - The original `WithOptions` containing user-specified options and references
230/// * `session` - The current user session, used to access catalogs and verify permissions
231/// * `object` - Optional telemetry information about the database object being created/altered
232///
233/// # Returns
234/// A tuple containing:
235/// * `WithOptionsSecResolved` - `WithOptions` with all references resolved and merged
236/// * `PbConnectionType` - The type of the referenced connection (if any)
237/// * `Option<u32>` - The ID of the referenced connection (if any)
238///
239/// # Workflow
240/// 1. Extract connector name, options, secret refs, and connection refs from `with_options`
241/// 2. Resolve any CONNECTION references by looking them up in the catalog
242/// 3. Resolve any SECRET references by looking them up in the catalog
243/// 4. Merge properties from CONNECTION with directly specified options
244/// 5. Perform validation to ensure no conflicts between options
245/// 6. Return the merged options, connection type, and connection ID
246pub(crate) fn resolve_connection_ref_and_secret_ref(
247    with_options: WithOptions,
248    session: &SessionImpl,
249    object: Option<TelemetryDatabaseObject>,
250) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
251    let connector_name = with_options.get_connector();
252    let db_name: &str = &session.database();
253    let (mut options, secret_refs, connection_refs) = with_options.into_parts();
254
255    let mut connection_id = None;
256    let mut connection_params = None;
257    for connection_ref in connection_refs.values() {
258        // at most one connection ref in the map
259        connection_params = {
260            // get connection params from catalog
261            let (schema_name, connection_name) =
262                Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
263            let connection_catalog =
264                session.get_connection_by_name(schema_name, &connection_name)?;
265            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
266                connection_id = Some(connection_catalog.id);
267                Some(params.clone())
268            } else {
269                return Err(RwError::from(ErrorCode::InvalidParameterValue(
270                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
271                )));
272            }
273        };
274
275        // Report telemetry event for connection reference if an object is provided
276        if let Some(object) = object {
277            // report to telemetry
278            report_event(
279                PbTelemetryEventStage::CreateStreamJob,
280                "connection_ref",
281                0,
282                connector_name.clone(),
283                Some(object),
284                {
285                    connection_params.as_ref().map(|cp| {
286                        jsonbb::json!({
287                            "connection_type": cp.connection_type().as_str_name().to_owned()
288                        })
289                    })
290                },
291            );
292        }
293    }
294
295    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
296
297    // Initialize connection_type to Unspecified (will be updated if connection is used)
298    let mut connection_type = PbConnectionType::Unspecified;
299    let connection_params_is_none_flag = connection_params.is_none();
300
301    // If we have a connection, merge its properties with directly specified options
302    if let Some(connection_params) = connection_params {
303        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
304        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
305        //
306        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
307        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
308        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
309
310        if let Some(broker_rewrite_map) = connection_params
311            .get_properties()
312            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
313            && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
314                || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
315        {
316            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
317                "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
318                broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
319            ))));
320        }
321
322        // Extract connection type and merge properties
323        connection_type = connection_params.connection_type();
324        for (k, v) in connection_params.properties {
325            if options.insert(k.clone(), v).is_some() {
326                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
327                    "Duplicated key in both WITH clause and Connection catalog: {}",
328                    k
329                ))));
330            }
331        }
332
333        // Merge secret references from connection
334        for (k, v) in connection_params.secret_refs {
335            if inner_secret_refs.insert(k.clone(), v).is_some() {
336                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
337                    "Duplicated key in both WITH clause and Connection catalog: {}",
338                    k
339                ))));
340            }
341        }
342
343        // Schema Registry specific validation
344        {
345            if connection_type == PbConnectionType::SchemaRegistry {
346                // Ensure no AWS/Glue options are provided when using schema registry connection
347                if options
348                    .keys()
349                    .chain(inner_secret_refs.keys())
350                    .any(|k| k.starts_with("aws"))
351                {
352                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
353                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
354                        )));
355                }
356            }
357        }
358    }
359
360    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
361    if connection_params_is_none_flag {
362        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
363    }
364    Ok((
365        WithOptionsSecResolved::new(options, inner_secret_refs),
366        connection_type,
367        connection_id,
368    ))
369}
370
371/// Get the secret id from the name.
372pub(crate) fn resolve_secret_ref_in_with_options(
373    with_options: WithOptions,
374    session: &SessionImpl,
375) -> RwResult<WithOptionsSecResolved> {
376    let (options, secret_refs, _) = with_options.into_parts();
377    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
378    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
379}
380
381fn resolve_secret_refs_inner(
382    secret_refs: BTreeMap<String, SecretRefValue>,
383    session: &SessionImpl,
384) -> RwResult<BTreeMap<String, PbSecretRef>> {
385    let db_name: &str = &session.database();
386    let mut resolved_secret_refs = BTreeMap::new();
387    for (key, secret_ref) in secret_refs {
388        let (schema_name, secret_name) =
389            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
390        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
391        let ref_as = match secret_ref.ref_as {
392            SecretRefAsType::Text => PbRefAsType::Text,
393            SecretRefAsType::File => PbRefAsType::File,
394        };
395        let pb_secret_ref = PbSecretRef {
396            secret_id: secret_catalog.id.secret_id(),
397            ref_as: ref_as.into(),
398        };
399        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
400    }
401    Ok(resolved_secret_refs)
402}
403
404pub(crate) fn resolve_source_refresh_mode_in_with_option(
405    with_options: &mut WithOptions,
406) -> RwResult<Option<SourceRefreshMode>> {
407    let source_refresh_mode = if let Some(source_refresh_mode_str) =
408        with_options.remove(SOURCE_REFRESH_MODE_KEY)
409    {
410        match source_refresh_mode_str.to_uppercase().as_str() {
411            "STREAMING" => SourceRefreshMode {
412                refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
413            },
414            "FULL_RECOMPUTE" => SourceRefreshMode {
415                refresh_mode: Some(RefreshMode::FullRecompute(
416                    SourceRefreshModeFullRecompute {},
417                )),
418            },
419            _ => {
420                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
421                    "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RECOMPUTE'",
422                    SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
423                ))));
424            }
425        }
426    } else {
427        return Ok(None);
428    };
429    Ok(Some(source_refresh_mode))
430}
431
432pub(crate) fn resolve_privatelink_in_with_option(
433    with_options: &mut WithOptions,
434) -> RwResult<Option<ConnectionId>> {
435    let is_kafka = with_options.is_kafka_connector();
436    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
437
438    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
439    if let Some(endpoint) = privatelink_endpoint {
440        if !is_kafka {
441            return Err(RwError::from(ErrorCode::ProtocolError(
442                "Privatelink is only supported in kafka connector".to_owned(),
443            )));
444        }
445        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
446            .map_err(RwError::from)?;
447    }
448    Ok(None)
449}
450
451impl TryFrom<&[SqlOption]> for WithOptions {
452    type Error = RwError;
453
454    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
455        let mut inner: BTreeMap<String, String> = BTreeMap::new();
456        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
457        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
458        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
459        for option in options {
460            let key = option.name.real_value();
461            match &option.value {
462                SqlOptionValue::SecretRef(r) => {
463                    if secret_ref.insert(key.clone(), r.clone()).is_some()
464                        || inner.contains_key(&key)
465                    {
466                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
467                            "Duplicated option: {}",
468                            key
469                        ))));
470                    }
471                    continue;
472                }
473                SqlOptionValue::ConnectionRef(r) => {
474                    if connection_ref.insert(key.clone(), r.clone()).is_some()
475                        || inner.contains_key(&key)
476                    {
477                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
478                            "Duplicated option: {}",
479                            key
480                        ))));
481                    }
482                    continue;
483                }
484                SqlOptionValue::BackfillOrder(b) => {
485                    backfill_order_strategy = b.clone();
486                    continue;
487                }
488                _ => {}
489            }
490            let value: String = match option.value.clone() {
491                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
492                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
493                SqlOptionValue::Value(Value::Number(n)) => n,
494                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
495                _ => {
496                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
497                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
498                            .to_owned(),
499                    )))
500                }
501            };
502            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
503                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
504                    "Duplicated option: {}",
505                    key
506                ))));
507            }
508        }
509
510        Ok(Self {
511            inner,
512            secret_ref,
513            connection_ref,
514            backfill_order_strategy,
515        })
516    }
517}
518
519impl TryFrom<&Statement> for WithOptions {
520    type Error = RwError;
521
522    /// Extract options from the `WITH` clause from the given statement.
523    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
524        match statement {
525            // Explain: forward to the inner statement.
526            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
527
528            // View
529            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
530
531            // Sink
532            Statement::CreateSink {
533                stmt:
534                    CreateSinkStatement {
535                        with_properties, ..
536                    },
537            }
538            | Statement::CreateConnection {
539                stmt:
540                    CreateConnectionStatement {
541                        with_properties, ..
542                    },
543            }
544            | Statement::CreateSource {
545                stmt:
546                    CreateSourceStatement {
547                        with_properties, ..
548                    },
549                ..
550            }
551            | Statement::CreateSubscription {
552                stmt:
553                    CreateSubscriptionStatement {
554                        with_properties, ..
555                    },
556                ..
557            }
558            | Statement::CreateIndex {
559                with_properties, ..
560            } => Self::try_from(with_properties.0.as_slice()),
561            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
562
563            _ => Ok(Default::default()),
564        }
565    }
566}