risingwave_frontend/utils/
with_options.rs

1// Copyright 2022 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16use std::num::NonZeroU32;
17
18use risingwave_common::catalog::ConnectionId;
19pub use risingwave_connector::WithOptionsSecResolved;
20use risingwave_connector::connector_common::{
21    PRIVATE_LINK_BROKER_REWRITE_MAP_KEY, PRIVATE_LINK_TARGETS_KEY,
22};
23use risingwave_connector::source::kafka::private_link::{
24    PRIVATELINK_ENDPOINT_KEY, insert_privatelink_broker_rewrite_map,
25};
26use risingwave_connector::{Get, GetKeyIter, WithPropertiesExt};
27use risingwave_pb::catalog::connection::Info as ConnectionInfo;
28use risingwave_pb::catalog::connection_params::PbConnectionType;
29use risingwave_pb::plan_common::SourceRefreshMode;
30use risingwave_pb::plan_common::source_refresh_mode::{
31    RefreshMode, SourceRefreshModeFullReload, SourceRefreshModeStreaming,
32};
33use risingwave_pb::secret::PbSecretRef;
34use risingwave_pb::secret::secret_ref::PbRefAsType;
35use risingwave_pb::telemetry::{PbTelemetryEventStage, TelemetryDatabaseObject};
36use risingwave_sqlparser::ast::{
37    BackfillOrderStrategy, ConnectionRefValue, CreateConnectionStatement, CreateSinkStatement,
38    CreateSourceStatement, CreateSubscriptionStatement, SecretRefAsType, SecretRefValue, SqlOption,
39    SqlOptionValue, Statement, Value,
40};
41use thiserror_ext::AsReport;
42
43use super::OverwriteOptions;
44use crate::Binder;
45use crate::error::{ErrorCode, Result as RwResult, RwError};
46use crate::handler::create_source::{UPSTREAM_SOURCE_KEY, WEBHOOK_CONNECTOR};
47use crate::session::SessionImpl;
48use crate::telemetry::report_event;
49
50pub mod options {
51    pub const RETENTION_SECONDS: &str = "retention_seconds";
52}
53
54pub const MV_REFRESH_INTERVAL_SEC_KEY: &str = "refresh.interval.sec";
55pub const SOURCE_REFRESH_MODE_KEY: &str = "refresh_mode";
56pub const SOURCE_REFRESH_INTERVAL_SEC_KEY: &str = "refresh_interval_sec";
57
58/// Options or properties extracted from the `WITH` clause of DDLs.
59#[derive(Default, Clone, Debug, PartialEq, Eq, Hash)]
60pub struct WithOptions {
61    inner: BTreeMap<String, String>,
62    secret_ref: BTreeMap<String, SecretRefValue>,
63    connection_ref: BTreeMap<String, ConnectionRefValue>,
64    backfill_order_strategy: BackfillOrderStrategy,
65}
66
67impl GetKeyIter for WithOptions {
68    fn key_iter(&self) -> impl Iterator<Item = &str> {
69        self.inner.keys().map(|s| s.as_str())
70    }
71}
72
73impl Get for WithOptions {
74    fn get(&self, key: &str) -> Option<&String> {
75        self.inner.get(key)
76    }
77}
78
79impl std::ops::Deref for WithOptions {
80    type Target = BTreeMap<String, String>;
81
82    fn deref(&self) -> &Self::Target {
83        &self.inner
84    }
85}
86
87impl std::ops::DerefMut for WithOptions {
88    fn deref_mut(&mut self) -> &mut Self::Target {
89        &mut self.inner
90    }
91}
92
93impl WithOptions {
94    /// Create a new [`WithOptions`] from a [`BTreeMap`].
95    pub fn new_with_options(inner: BTreeMap<String, String>) -> Self {
96        Self {
97            inner,
98            secret_ref: Default::default(),
99            connection_ref: Default::default(),
100            backfill_order_strategy: Default::default(),
101        }
102    }
103
104    /// Create a new [`WithOptions`] from a option [`BTreeMap`] and secret ref.
105    pub fn new(
106        inner: BTreeMap<String, String>,
107        secret_ref: BTreeMap<String, SecretRefValue>,
108        connection_ref: BTreeMap<String, ConnectionRefValue>,
109    ) -> Self {
110        Self {
111            inner,
112            secret_ref,
113            connection_ref,
114            backfill_order_strategy: Default::default(),
115        }
116    }
117
118    pub fn inner_mut(&mut self) -> &mut BTreeMap<String, String> {
119        &mut self.inner
120    }
121
122    /// Take the value of the option map and secret refs.
123    pub fn into_parts(
124        self,
125    ) -> (
126        BTreeMap<String, String>,
127        BTreeMap<String, SecretRefValue>,
128        BTreeMap<String, ConnectionRefValue>,
129    ) {
130        (self.inner, self.secret_ref, self.connection_ref)
131    }
132
133    /// Convert to connector props, remove the key-value pairs used in the top-level.
134    pub fn into_connector_props(self) -> WithOptions {
135        let inner = self
136            .inner
137            .into_iter()
138            .filter(|(key, _)| {
139                key != OverwriteOptions::SOURCE_RATE_LIMIT_KEY && key != options::RETENTION_SECONDS
140            })
141            .collect();
142
143        Self {
144            inner,
145            secret_ref: self.secret_ref,
146            connection_ref: self.connection_ref,
147            backfill_order_strategy: self.backfill_order_strategy,
148        }
149    }
150
151    /// Parse the retention seconds from the options.
152    pub fn retention_seconds(&self) -> Option<NonZeroU32> {
153        self.inner
154            .get(options::RETENTION_SECONDS)
155            .and_then(|s| s.parse().ok())
156    }
157
158    pub fn refresh_interval_sec(&self) -> RwResult<Option<u64>> {
159        self.inner
160            .get(MV_REFRESH_INTERVAL_SEC_KEY)
161            .map(|seconds| {
162                let seconds = seconds.parse::<u64>().map_err(|e| {
163                    RwError::from(ErrorCode::InvalidParameterValue(format!(
164                        "`{}` must be a positive integer, but got: {} (error: {})",
165                        MV_REFRESH_INTERVAL_SEC_KEY,
166                        seconds,
167                        e.as_report()
168                    )))
169                })?;
170                if seconds == 0 {
171                    return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
172                        "`{}` must be larger than 0, but got: {}",
173                        MV_REFRESH_INTERVAL_SEC_KEY, seconds
174                    ))));
175                }
176                Ok(seconds)
177            })
178            .transpose()
179    }
180
181    /// Get a subset of the options from the given keys.
182    pub fn subset(&self, keys: impl IntoIterator<Item = impl AsRef<str>>) -> Self {
183        let inner = keys
184            .into_iter()
185            .filter_map(|k| {
186                self.inner
187                    .get_key_value(k.as_ref())
188                    .map(|(k, v)| (k.clone(), v.clone()))
189            })
190            .collect();
191
192        Self {
193            inner,
194            secret_ref: self.secret_ref.clone(),
195            connection_ref: self.connection_ref.clone(),
196            backfill_order_strategy: self.backfill_order_strategy.clone(),
197        }
198    }
199
200    pub fn value_eq_ignore_case(&self, key: &str, val: &str) -> bool {
201        if let Some(inner_val) = self.inner.get(key)
202            && inner_val.eq_ignore_ascii_case(val)
203        {
204            return true;
205        }
206        false
207    }
208
209    pub fn secret_ref(&self) -> &BTreeMap<String, SecretRefValue> {
210        &self.secret_ref
211    }
212
213    pub fn secret_ref_mut(&mut self) -> &mut BTreeMap<String, SecretRefValue> {
214        &mut self.secret_ref
215    }
216
217    pub fn connection_ref(&self) -> &BTreeMap<String, ConnectionRefValue> {
218        &self.connection_ref
219    }
220
221    pub fn connection_ref_mut(&mut self) -> &mut BTreeMap<String, ConnectionRefValue> {
222        &mut self.connection_ref
223    }
224
225    pub fn oauth_options_to_map(sql_options: &[SqlOption]) -> RwResult<BTreeMap<String, String>> {
226        let WithOptions {
227            inner, secret_ref, ..
228        } = WithOptions::try_from(sql_options)?;
229        if secret_ref.is_empty() {
230            Ok(inner)
231        } else {
232            Err(RwError::from(ErrorCode::InvalidParameterValue(
233                "Secret reference is not allowed in OAuth options".to_owned(),
234            )))
235        }
236    }
237
238    pub fn is_source_connector(&self) -> bool {
239        self.inner.contains_key(UPSTREAM_SOURCE_KEY)
240            && self.inner.get(UPSTREAM_SOURCE_KEY).unwrap() != WEBHOOK_CONNECTOR
241    }
242
243    pub fn backfill_order_strategy(&self) -> BackfillOrderStrategy {
244        self.backfill_order_strategy.clone()
245    }
246}
247
248/// Resolves connection references and secret references in `WithOptions`.
249///
250/// This function takes `WithOptions` (typically from a CREATE/ALTER statement),
251/// resolves any CONNECTION and SECRET references, and merges their properties
252/// with the directly specified options.
253///
254/// # Arguments
255/// * `with_options` - The original `WithOptions` containing user-specified options and references
256/// * `session` - The current user session, used to access catalogs and verify permissions
257/// * `object` - Optional telemetry information about the database object being created/altered
258///
259/// # Returns
260/// A tuple containing:
261/// * `WithOptionsSecResolved` - `WithOptions` with all references resolved and merged
262/// * `PbConnectionType` - The type of the referenced connection (if any)
263/// * `Option<u32>` - The ID of the referenced connection (if any)
264///
265/// # Workflow
266/// 1. Extract connector name, options, secret refs, and connection refs from `with_options`
267/// 2. Resolve any CONNECTION references by looking them up in the catalog
268/// 3. Resolve any SECRET references by looking them up in the catalog
269/// 4. Merge properties from CONNECTION with directly specified options
270/// 5. Perform validation to ensure no conflicts between options
271/// 6. Return the merged options, connection type, and connection ID
272pub(crate) fn resolve_connection_ref_and_secret_ref(
273    with_options: WithOptions,
274    session: &SessionImpl,
275    object: Option<TelemetryDatabaseObject>,
276) -> RwResult<(
277    WithOptionsSecResolved,
278    PbConnectionType,
279    Option<ConnectionId>,
280)> {
281    let connector_name = with_options.get_connector();
282    let db_name: &str = &session.database();
283    let (mut options, secret_refs, connection_refs) = with_options.into_parts();
284
285    let mut connection_id = None;
286    let mut connection_params = None;
287    for connection_ref in connection_refs.values() {
288        // at most one connection ref in the map
289        connection_params = {
290            // get connection params from catalog
291            let (schema_name, connection_name) =
292                Binder::resolve_schema_qualified_name(db_name, &connection_ref.connection_name)?;
293            let connection_catalog =
294                session.get_connection_by_name(schema_name, &connection_name)?;
295            if let ConnectionInfo::ConnectionParams(params) = &connection_catalog.info {
296                connection_id = Some(connection_catalog.id);
297                Some(params.clone())
298            } else {
299                return Err(RwError::from(ErrorCode::InvalidParameterValue(
300                    "Private Link Service has been deprecated. Please create a new connection instead.".to_owned(),
301                )));
302            }
303        };
304
305        // Report telemetry event for connection reference if an object is provided
306        if let Some(object) = object {
307            // report to telemetry
308            report_event(
309                PbTelemetryEventStage::CreateStreamJob,
310                "connection_ref",
311                0,
312                connector_name.clone(),
313                Some(object),
314                {
315                    connection_params.as_ref().map(|cp| {
316                        jsonbb::json!({
317                            "connection_type": cp.connection_type().as_str_name().to_owned()
318                        })
319                    })
320                },
321            );
322        }
323    }
324
325    let mut inner_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
326
327    // Initialize connection_type to Unspecified (will be updated if connection is used)
328    let mut connection_type = PbConnectionType::Unspecified;
329    let connection_params_is_none_flag = connection_params.is_none();
330
331    // If we have a connection, merge its properties with directly specified options
332    if let Some(connection_params) = connection_params {
333        // Do key checks on `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY`, `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY`
334        // `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is generated from `private_link_targets` and `private_link_endpoint`, instead of given by users.
335        //
336        // We resolve private link via `resolve_privatelink_in_with_option` when creating Connection,
337        // so here we need to check `PRIVATE_LINK_TARGETS_KEY` and `PRIVATELINK_ENDPOINT_KEY` are not given
338        // if `PRIVATE_LINK_BROKER_REWRITE_MAP_KEY` is in Connection catalog.
339
340        if let Some(broker_rewrite_map) = connection_params
341            .get_properties()
342            .get(PRIVATE_LINK_BROKER_REWRITE_MAP_KEY)
343            && (options.contains_key(PRIVATE_LINK_TARGETS_KEY)
344                || options.contains_key(PRIVATELINK_ENDPOINT_KEY))
345        {
346            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
347                "PrivateLink related options already defined in Connection (rewrite map: {}), please remove {} and {} from WITH clause",
348                broker_rewrite_map, PRIVATE_LINK_TARGETS_KEY, PRIVATELINK_ENDPOINT_KEY
349            ))));
350        }
351
352        // Extract connection type and merge properties
353        connection_type = connection_params.connection_type();
354        for (k, v) in connection_params.properties {
355            if options.insert(k.clone(), v).is_some() {
356                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
357                    "Duplicated key in both WITH clause and Connection catalog: {}",
358                    k
359                ))));
360            }
361        }
362
363        // Merge secret references from connection
364        for (k, v) in connection_params.secret_refs {
365            if inner_secret_refs.insert(k.clone(), v).is_some() {
366                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
367                    "Duplicated key in both WITH clause and Connection catalog: {}",
368                    k
369                ))));
370            }
371        }
372
373        // Schema Registry specific validation
374        {
375            if connection_type == PbConnectionType::SchemaRegistry {
376                // Ensure no AWS/Glue options are provided when using schema registry connection
377                if options
378                    .keys()
379                    .chain(inner_secret_refs.keys())
380                    .any(|k| k.starts_with("aws"))
381                {
382                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
383                            "Glue related options/secrets are not allowed when using schema registry connection".to_owned()
384                        )));
385                }
386            }
387        }
388    }
389
390    // connection_params is None means the connection is not retrieved, so the connection type should be unspecified
391    if connection_params_is_none_flag {
392        debug_assert!(matches!(connection_type, PbConnectionType::Unspecified));
393    }
394    Ok((
395        WithOptionsSecResolved::new(options, inner_secret_refs),
396        connection_type,
397        connection_id,
398    ))
399}
400
401/// Get the secret id from the name.
402pub(crate) fn resolve_secret_ref_in_with_options(
403    with_options: WithOptions,
404    session: &SessionImpl,
405) -> RwResult<WithOptionsSecResolved> {
406    let (options, secret_refs, _) = with_options.into_parts();
407    let resolved_secret_refs = resolve_secret_refs_inner(secret_refs, session)?;
408    Ok(WithOptionsSecResolved::new(options, resolved_secret_refs))
409}
410
411fn resolve_secret_refs_inner(
412    secret_refs: BTreeMap<String, SecretRefValue>,
413    session: &SessionImpl,
414) -> RwResult<BTreeMap<String, PbSecretRef>> {
415    let db_name: &str = &session.database();
416    let mut resolved_secret_refs = BTreeMap::new();
417    for (key, secret_ref) in secret_refs {
418        let (schema_name, secret_name) =
419            Binder::resolve_schema_qualified_name(db_name, &secret_ref.secret_name)?;
420        let secret_catalog = session.get_secret_by_name(schema_name, &secret_name)?;
421        let ref_as = match secret_ref.ref_as {
422            SecretRefAsType::Text => PbRefAsType::Text,
423            SecretRefAsType::File => PbRefAsType::File,
424        };
425        let pb_secret_ref = PbSecretRef {
426            secret_id: secret_catalog.id,
427            ref_as: ref_as.into(),
428        };
429        resolved_secret_refs.insert(key.clone(), pb_secret_ref);
430    }
431    Ok(resolved_secret_refs)
432}
433
434pub(crate) fn resolve_source_refresh_mode_in_with_option(
435    with_options: &mut WithOptions,
436) -> RwResult<Option<SourceRefreshMode>> {
437    let source_refresh_interval_sec =
438        {
439            if let Some(maybe_int) = with_options.remove(SOURCE_REFRESH_INTERVAL_SEC_KEY) {
440                let some_int = maybe_int.parse::<i64>().map_err(|e| {
441                RwError::from(ErrorCode::InvalidParameterValue(format!(
442                    "`{}` must be a positive integer and larger than 0, but got: {} (error: {})",
443                    SOURCE_REFRESH_INTERVAL_SEC_KEY, maybe_int, e.as_report()
444                )))
445            })?;
446                if some_int <= 0 {
447                    return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
448                        "`{}` must be larger than 0, but got: {}",
449                        SOURCE_REFRESH_INTERVAL_SEC_KEY, some_int
450                    ))));
451                }
452                Some(some_int)
453            } else {
454                None
455            }
456        };
457
458    let mut is_full_reload = false;
459    let source_refresh_mode_str = with_options.remove(SOURCE_REFRESH_MODE_KEY);
460    let source_refresh_mode = if let Some(source_refresh_mode_str) = &source_refresh_mode_str {
461        Some(match source_refresh_mode_str.to_uppercase().as_str() {
462            "STREAMING" => {
463                if source_refresh_interval_sec.is_some() {
464                    return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
465                        "`{}` is not allowed when `{}` is 'STREAMING'",
466                        SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
467                    ))));
468                }
469                SourceRefreshMode {
470                    refresh_mode: Some(RefreshMode::Streaming(SourceRefreshModeStreaming {})),
471                }
472            }
473            "FULL_RELOAD" => {
474                is_full_reload = true;
475                SourceRefreshMode {
476                    refresh_mode: Some(RefreshMode::FullReload(SourceRefreshModeFullReload {
477                        refresh_interval_sec: source_refresh_interval_sec,
478                    })),
479                }
480            }
481            _ => {
482                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
483                    "Invalid key `{}`: {}, accepted values are 'STREAMING' and 'FULL_RELOAD'",
484                    SOURCE_REFRESH_MODE_KEY, source_refresh_mode_str
485                ))));
486            }
487        })
488    } else {
489        // also check the `refresh_interval_sec` is not provided when `refresh_mode` is not provided
490        if source_refresh_interval_sec.is_some() {
491            return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
492                "`{}` is not allowed when `{}` is not 'FULL_RELOAD'",
493                SOURCE_REFRESH_INTERVAL_SEC_KEY, SOURCE_REFRESH_MODE_KEY
494            ))));
495        }
496        None
497    };
498
499    // Batch connectors require FULL_RELOAD mode.
500    if with_options.is_batch_connector() && !is_full_reload {
501        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
502            "Refreshable source {} must be refreshed with 'FULL_RELOAD' refresh mode. Please set `refresh_mode` to 'FULL_RELOAD'.",
503            with_options.get_connector().unwrap(),
504        ))));
505    }
506    Ok(source_refresh_mode)
507}
508
509pub(crate) fn resolve_privatelink_in_with_option(
510    with_options: &mut WithOptions,
511) -> RwResult<Option<ConnectionId>> {
512    let is_kafka = with_options.is_kafka_connector();
513    let privatelink_endpoint = with_options.remove(PRIVATELINK_ENDPOINT_KEY);
514
515    // if `privatelink.endpoint` is provided in WITH, use it to rewrite broker address directly
516    if let Some(endpoint) = privatelink_endpoint {
517        if !is_kafka {
518            return Err(RwError::from(ErrorCode::ProtocolError(
519                "Privatelink is only supported in kafka connector".to_owned(),
520            )));
521        }
522        insert_privatelink_broker_rewrite_map(with_options.inner_mut(), None, Some(endpoint))
523            .map_err(RwError::from)?;
524    }
525    Ok(None)
526}
527
528impl TryFrom<&[SqlOption]> for WithOptions {
529    type Error = RwError;
530
531    fn try_from(options: &[SqlOption]) -> Result<Self, Self::Error> {
532        let mut inner: BTreeMap<String, String> = BTreeMap::new();
533        let mut secret_ref: BTreeMap<String, SecretRefValue> = BTreeMap::new();
534        let mut connection_ref: BTreeMap<String, ConnectionRefValue> = BTreeMap::new();
535        let mut backfill_order_strategy = BackfillOrderStrategy::Default;
536        for option in options {
537            let key = option.name.real_value();
538            match &option.value {
539                SqlOptionValue::SecretRef(r) => {
540                    if secret_ref.insert(key.clone(), r.clone()).is_some()
541                        || inner.contains_key(&key)
542                    {
543                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
544                            "Duplicated option: {}",
545                            key
546                        ))));
547                    }
548                    continue;
549                }
550                SqlOptionValue::ConnectionRef(r) => {
551                    if connection_ref.insert(key.clone(), r.clone()).is_some()
552                        || inner.contains_key(&key)
553                    {
554                        return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
555                            "Duplicated option: {}",
556                            key
557                        ))));
558                    }
559                    continue;
560                }
561                SqlOptionValue::BackfillOrder(b) => {
562                    backfill_order_strategy = b.clone();
563                    continue;
564                }
565                _ => {}
566            }
567            let value: String = match option.value.clone() {
568                SqlOptionValue::Value(Value::CstyleEscapedString(s)) => s.value,
569                SqlOptionValue::Value(Value::SingleQuotedString(s)) => s,
570                SqlOptionValue::Value(Value::Number(n)) => n,
571                SqlOptionValue::Value(Value::Boolean(b)) => b.to_string(),
572                _ => {
573                    return Err(RwError::from(ErrorCode::InvalidParameterValue(
574                        "`with options` or `with properties` only support single quoted string value and C style escaped string"
575                            .to_owned(),
576                    )))
577                }
578            };
579            if inner.insert(key.clone(), value).is_some() || secret_ref.contains_key(&key) {
580                return Err(RwError::from(ErrorCode::InvalidParameterValue(format!(
581                    "Duplicated option: {}",
582                    key
583                ))));
584            }
585        }
586
587        Ok(Self {
588            inner,
589            secret_ref,
590            connection_ref,
591            backfill_order_strategy,
592        })
593    }
594}
595
596impl TryFrom<&Statement> for WithOptions {
597    type Error = RwError;
598
599    /// Extract options from the `WITH` clause from the given statement.
600    fn try_from(statement: &Statement) -> Result<Self, Self::Error> {
601        match statement {
602            // Explain: forward to the inner statement.
603            Statement::Explain { statement, .. } => Self::try_from(statement.as_ref()),
604
605            // View
606            Statement::CreateView { with_options, .. } => Self::try_from(with_options.as_slice()),
607
608            // Sink
609            Statement::CreateSink {
610                stmt:
611                    CreateSinkStatement {
612                        with_properties, ..
613                    },
614            }
615            | Statement::CreateConnection {
616                stmt:
617                    CreateConnectionStatement {
618                        with_properties, ..
619                    },
620            }
621            | Statement::CreateSource {
622                stmt:
623                    CreateSourceStatement {
624                        with_properties, ..
625                    },
626                ..
627            }
628            | Statement::CreateSubscription {
629                stmt:
630                    CreateSubscriptionStatement {
631                        with_properties, ..
632                    },
633                ..
634            }
635            | Statement::CreateIndex {
636                with_properties, ..
637            } => Self::try_from(with_properties.0.as_slice()),
638            Statement::CreateTable { with_options, .. } => Self::try_from(with_options.as_slice()),
639
640            _ => Ok(Default::default()),
641        }
642    }
643}