risingwave_frontend/utils/
with_options.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::secret::PbSecretRef;
30use risingwave_pb::secret::secret_ref::PbRefAsType;
31use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
32use risingwave_sqlparser::ast::{
33    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
34    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
35    SqlOptionValue, Statement, Value,
36};
37
38use super::OverwriteOptions;
39use crate::Binder;
40use crate::error::{ErrorCode, Result as RwResult, RwError};
41use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
42use crate::session::SessionImpl;
43use crate::telemetry::report_event;
44
45pub mod options {
46    pub const RETENTION_SECONDS: &str = "retention_seconds";
47}
48
49/// Options or properties extracted from the `WITH` clause of DDLs.
50#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
51pub struct WithOptions {
52    inner: BTreeMap<String, String>,
53    secret_ref: BTreeMap<String, SecretRefValue>,
54    connection_ref: BTreeMap<String, ConnectionRefValue>,
55    backfill_order_strategy: BackfillOrderStrategy,
56}
57
58impl GetKeyIter for WithOptions {
59    fn key_iter(&self) -> impl Iterator<Item = &str> {
60        self.inner.keys().map(|s| s.as_str())
61    }
62}
63
64impl Get for WithOptions {
65    fn get(&self, key: &str) -> Option<&String> {
66        self.inner.get(key)
67    }
68}
69
70impl std::ops::Deref for WithOptions {
71    type Target = BTreeMap<String, String>;
72
73    fn deref(&self) -> &Self::Target {
74        &self.inner
75    }
76}
77
78impl std::ops::DerefMut for WithOptions {
79    fn deref_mut(&mut self) -> &mut Self::Target {
80        &mut self.inner
81    }
82}
83
84impl WithOptions {
85    /// Create a new [`WithOptions`] from a [`BTreeMap`].
86    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
87        Self {
88            inner,
89            secret_ref: Default::default(),
90            connection_ref: Default::default(),
91            backfill_order_strategy: Default::default(),
92        }
93    }
94
95    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
96    pub fn new(
97        inner: BTreeMap<String, String>,
98        secret_ref: BTreeMap<String, SecretRefValue>,
99        connection_ref: BTreeMap<String, ConnectionRefValue>,
100    ) -> Self {
101        Self {
102            inner,
103            secret_ref,
104            connection_ref,
105            backfill_order_strategy: Default::default(),
106        }
107    }
108
109    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
110        &mut self.inner
111    }
112
113    /// Take the value of the option map and secret refs.
114    pub fn into_parts(
115        self,
116    ) -> (
117        BTreeMap<String, String>,
118        BTreeMap<String, SecretRefValue>,
119        BTreeMap<String, ConnectionRefValue>,
120    ) {
121        (self.inner, self.secret_ref, self.connection_ref)
122    }
123
124    /// Convert to connector props, remove the key-value pairs used in the top-level.
125    pub fn into_connector_props(self) -> WithOptions {
126        let inner = self
127            .inner
128            .into_iter()
129            .filter(|(key, _)| {
130                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
131            })
132            .collect();
133
134        Self {
135            inner,
136            secret_ref: self.secret_ref,
137            connection_ref: self.connection_ref,
138            backfill_order_strategy: self.backfill_order_strategy,
139        }
140    }
141
142    /// Parse the retention seconds from the options.
143    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
144        self.inner
145            .get(options::RETENTION_SECONDS)
146            .and_then(|s| s.parse().ok())
147    }
148
149    /// Get a subset of the options from the given keys.
150    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
151        let inner = keys
152            .into_iter()
153            .filter_map(|k| {
154                self.inner
155                    .get_key_value(k.as_ref())
156                    .map(|(k, v)| (k.clone(), v.clone()))
157            })
158            .collect();
159
160        Self {
161            inner,
162            secret_ref: self.secret_ref.clone(),
163            connection_ref: self.connection_ref.clone(),
164            backfill_order_strategy: self.backfill_order_strategy.clone(),
165        }
166    }
167
168    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
169        if let Some(inner_val) = self.inner.get(key) {
170            if inner_val.eq_ignore_ascii_case(val) {
171                return true;
172            }
173        }
174        false
175    }
176
177    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
178        &self.secret_ref
179    }
180
181    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
182        &mut self.secret_ref
183    }
184
185    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
186        &self.connection_ref
187    }
188
189    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
190        &mut self.connection_ref
191    }
192
193    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
194        let WithOptions {
195            inner, secret_ref, ..
196        } = WithOptions::try_from(sql_options)?;
197        if secret_ref.is_empty() {
198            Ok(inner)
199        } else {
200            Err(RwError::from(ErrorCode::InvalidParameterValue(
201                "Secret reference is not allowed in OAuth options".to_owned(),
202            )))
203        }
204    }
205
206    pub fn is_source_connector(&self) -> bool {
207        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
208            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
209    }
210
211    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
212        self.backfill_order_strategy.clone()
213    }
214}
215
216pub(crate) fn resolve_connection_ref_and_secret_ref(
217    with_options: WithOptions,
218    session: &SessionImpl,
219    object: Option<TelemetryDatabaseObject>,
220) -> RwResult<(WithOptionsSecResolved, PbConnectionType, Option<u32>)> {
221    let connector_name = with_options.get_connector();
222    let db_name: &str = &session.database();
223    let (mut options, secret_refs, connection_refs) = with_options.clone().into_parts();
224
225    let mut connection_id = None;
226    let mut connection_params = None;
227    for connection_ref in connection_refs.values() {
228        // at most one connection ref in the map
229        connection_params = {
230            // get connection params from catalog
231            let (schema_name, connection_name) = Binder::resolve_schema_qualified_name(
232                db_name,
233                connection_ref.connection_name.clone(),
234            )?;
235            let connection_catalog =
236                session.get_connection_by_name(schema_name, &connection_name)?;
237            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
238                connection_id = Some(connection_catalog.id);
239                Some(params.clone())
240            } else {
241                return Err(RwError::from(ErrorCode::InvalidParameterValue(
242                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
243                )));
244            }
245        };
246
247        if let Some(object) = object {
248            // report to telemetry
249            report_event(
250                PbTelemetryEventStage::CreateStreamJob,
251                "connection_ref",
252                0,
253                connector_name.clone(),
254                Some(object),
255                {
256                    connection_params.as_ref().map(|cp| {
257                        jsonbb::json!({
258                            "connection_type": cp.connection_type().as_str_name().to_owned()
259                        })
260                    })
261                },
262            );
263        }
264    }
265
266    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
267
268    let mut connection_type = PbConnectionType::Unspecified;
269    let connection_params_is_none_flag = connection_params.is_none();
270
271    if let Some(connection_params) = connection_params {
272        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
273        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
274        //
275        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
276        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
277        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
278
279        if let Some(broker_rewrite_map) = connection_params
280            .get_properties()
281            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
282        {
283            if options.contains_key(PRIVATE_LINK_TARGETS_KEY)
284                || options.contains_key(PRIVATELINK_ENDPOINT_KEY)
285            {
286                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
287                    "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
288                    broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
289                ))));
290            }
291        }
292
293        connection_type = connection_params.connection_type();
294        for (k, v) in connection_params.properties {
295            if options.insert(k.clone(), v).is_some() {
296                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
297                    "Duplicated key in both WITH clause and Connection catalog: {}",
298                    k
299                ))));
300            }
301        }
302
303        for (k, v) in connection_params.secret_refs {
304            if inner_secret_refs.insert(k.clone(), v).is_some() {
305                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
306                    "Duplicated key in both WITH clause and Connection catalog: {}",
307                    k
308                ))));
309            }
310        }
311
312        {
313            // check if not mess up with schema registry connection and glue connection
314            if connection_type == PbConnectionType::SchemaRegistry {
315                // Check no AWS related options when using schema registry connection
316                if options
317                    .keys()
318                    .chain(inner_secret_refs.keys())
319                    .any(|k| k.starts_with("aws"))
320                {
321                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
322                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
323                        )));
324                }
325            }
326        }
327    }
328
329    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
330    if connection_params_is_none_flag {
331        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
332    }
333    Ok((
334        WithOptionsSecResolved::new(options, inner_secret_refs),
335        connection_type,
336        connection_id,
337    ))
338}
339
340/// Get the secret id from the name.
341pub(crate) fn resolve_secret_ref_in_with_options(
342    with_options: WithOptions,
343    session: &SessionImpl,
344) -> RwResult<WithOptionsSecResolved> {
345    let (options, secret_refs, _) = with_options.into_parts();
346    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
347    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
348}
349
350fn resolve_secret_refs_inner(
351    secret_refs: BTreeMap<String, SecretRefValue>,
352    session: &SessionImpl,
353) -> RwResult<BTreeMap<String, PbSecretRef>> {
354    let db_name: &str = &session.database();
355    let mut resolved_secret_refs = BTreeMap::new();
356    for (key, secret_ref) in secret_refs {
357        let (schema_name, secret_name) =
358            Binder::resolve_schema_qualified_name(db_name, secret_ref.secret_name.clone())?;
359        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
360        let ref_as = match secret_ref.ref_as {
361            SecretRefAsType::Text => PbRefAsType::Text,
362            SecretRefAsType::File => PbRefAsType::File,
363        };
364        let pb_secret_ref = PbSecretRef {
365            secret_id: secret_catalog.id.secret_id(),
366            ref_as: ref_as.into(),
367        };
368        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
369    }
370    Ok(resolved_secret_refs)
371}
372
373pub(crate) fn resolve_privatelink_in_with_option(
374    with_options: &mut WithOptions,
375) -> RwResult<Option<ConnectionId>> {
376    let is_kafka = with_options.is_kafka_connector();
377    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
378
379    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
380    if let Some(endpoint) = privatelink_endpoint {
381        if !is_kafka {
382            return Err(RwError::from(ErrorCode::ProtocolError(
383                "Privatelink is only supported in kafka connector".to_owned(),
384            )));
385        }
386        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
387            .map_err(RwError::from)?;
388    }
389    Ok(None)
390}
391
392impl TryFrom<&[SqlOption]> for WithOptions {
393    type Error = RwError;
394
395    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
396        let mut inner: BTreeMap<String, String> = BTreeMap::new();
397        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
398        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
399        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
400        for option in options {
401            let key = option.name.real_value();
402            match &option.value {
403                SqlOptionValue::SecretRef(r) => {
404                    if secret_ref.insert(key.clone(), r.clone()).is_some()
405                        || inner.contains_key(&key)
406                    {
407                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
408                            "Duplicated option: {}",
409                            key
410                        ))));
411                    }
412                    continue;
413                }
414                SqlOptionValue::ConnectionRef(r) => {
415                    if connection_ref.insert(key.clone(), r.clone()).is_some()
416                        || inner.contains_key(&key)
417                    {
418                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
419                            "Duplicated option: {}",
420                            key
421                        ))));
422                    }
423                    continue;
424                }
425                SqlOptionValue::BackfillOrder(b) => {
426                    backfill_order_strategy = b.clone();
427                    continue;
428                }
429                _ => {}
430            }
431            let value: String = match option.value.clone() {
432                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
433                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
434                SqlOptionValue::Value(Value::Number(n)) => n,
435                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
436                _ => {
437                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
438                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
439                            .to_owned(),
440                    )))
441                }
442            };
443            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
444                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
445                    "Duplicated option: {}",
446                    key
447                ))));
448            }
449        }
450
451        Ok(Self {
452            inner,
453            secret_ref,
454            connection_ref,
455            backfill_order_strategy,
456        })
457    }
458}
459
460impl TryFrom<&Statement> for WithOptions {
461    type Error = RwError;
462
463    /// Extract options from the `WITH` clause from the given statement.
464    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
465        match statement {
466            // Explain: forward to the inner statement.
467            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
468
469            // View
470            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
471
472            // Sink
473            Statement::CreateSink {
474                stmt:
475                    CreateSinkStatement {
476                        with_properties, ..
477                    },
478            }
479            | Statement::CreateConnection {
480                stmt:
481                    CreateConnectionStatement {
482                        with_properties, ..
483                    },
484            } => Self::try_from(with_properties.0.as_slice()),
485            Statement::CreateSource {
486                stmt:
487                    CreateSourceStatement {
488                        with_properties, ..
489                    },
490                ..
491            } => Self::try_from(with_properties.0.as_slice()),
492            Statement::CreateSubscription {
493                stmt:
494                    CreateSubscriptionStatement {
495                        with_properties, ..
496                    },
497                ..
498            } => Self::try_from(with_properties.0.as_slice()),
499            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
500
501            _ => Ok(Default::default()),
502        }
503    }
504}