risingwave_frontend/handler/
create_connection.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::system_param::reader::SystemParamsRead;
19use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
20use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
21use risingwave_connector::source::enforce_secret_connection;
22use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
23use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
24use risingwave_pb::catalog::connection_params::ConnectionType;
25use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
26use risingwave_pb::ddl_service::create_connection_request;
27use risingwave_pb::secret::SecretRef;
28use risingwave_pb::secret::secret_ref::RefAsType;
29use risingwave_sqlparser::ast::CreateConnectionStatement;
30
31use super::RwPgResponse;
32use crate::WithOptions;
33use crate::binder::Binder;
34use crate::catalog::SecretId;
35use crate::catalog::catalog_service::CatalogReadGuard;
36use crate::error::ErrorCode::ProtocolError;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::handler::HandlerArgs;
39use crate::session::SessionImpl;
40use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
41
42pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
43
44#[inline(always)]
45fn get_connection_property_required(
46    with_properties: &mut BTreeMap<String, String>,
47    property: &str,
48) -> Result<String> {
49    with_properties.remove(property).ok_or_else(|| {
50        RwError::from(ProtocolError(format!(
51            "Required property \"{property}\" is not provided"
52        )))
53    })
54}
55fn resolve_create_connection_payload(
56    with_properties: WithOptions,
57    session: &SessionImpl,
58) -> Result<create_connection_request::Payload> {
59    if !with_properties.connection_ref().is_empty() {
60        return Err(RwError::from(ErrorCode::InvalidParameterValue(
61            "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
62        )));
63    }
64
65    let (mut props, secret_refs) =
66        resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
67    let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
68    let connection_type = match connection_type.as_str() {
69        PRIVATELINK_CONNECTION => {
70            return Err(RwError::from(ErrorCode::Deprecated(
71            "CREATE CONNECTION to Private Link".to_owned(),
72            "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
73        )));
74        }
75        KAFKA_CONNECTOR => ConnectionType::Kafka,
76        ICEBERG_CONNECTOR => ConnectionType::Iceberg,
77        SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
78        ES_SINK => ConnectionType::Elasticsearch,
79        _ => {
80            return Err(RwError::from(ProtocolError(format!(
81                "Connection type \"{connection_type}\" is not supported"
82            ))));
83        }
84    };
85    Ok(create_connection_request::Payload::ConnectionParams(
86        ConnectionParams {
87            connection_type: connection_type as i32,
88            properties: props.into_iter().collect(),
89            secret_refs: secret_refs.into_iter().collect(),
90        },
91    ))
92}
93
94pub async fn handle_create_connection(
95    handler_args: HandlerArgs,
96    stmt: CreateConnectionStatement,
97) -> Result<RwPgResponse> {
98    let session = handler_args.session.clone();
99    let db_name = &session.database();
100    let (schema_name, connection_name) =
101        Binder::resolve_schema_qualified_name(db_name, &stmt.connection_name)?;
102
103    if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
104        return if stmt.if_not_exists {
105            Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
106                .notice(format!(
107                    "connection \"{}\" exists, skipping",
108                    connection_name
109                ))
110                .into())
111        } else {
112            Err(e)
113        };
114    }
115    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
116    let mut with_properties = handler_args.with_options.clone().into_connector_props();
117    resolve_privatelink_in_with_option(&mut with_properties)?;
118    let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
119
120    let catalog_writer = session.catalog_writer()?;
121
122    if session
123        .env()
124        .system_params_manager()
125        .get_params()
126        .load()
127        .enforce_secret()
128    {
129        use risingwave_pb::ddl_service::create_connection_request::Payload::ConnectionParams;
130        let ConnectionParams(cp) = &create_connection_payload else {
131            unreachable!()
132        };
133        enforce_secret_connection(
134            &cp.connection_type(),
135            cp.properties.keys().map(|s| s.as_str()),
136        )?;
137    }
138
139    catalog_writer
140        .create_connection(
141            connection_name,
142            database_id,
143            schema_id,
144            session.user_id(),
145            create_connection_payload,
146        )
147        .await?;
148
149    Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
150}
151
152pub fn print_connection_params(
153    db_name: &str,
154    params: &PbConnectionParams,
155    catalog_reader: &CatalogReadGuard,
156) -> String {
157    let print_secret_ref = |secret_ref: &SecretRef| -> String {
158        // the lookup across all schemas in the database but should guarantee the secret exists
159        let (schema_name, secret_name) = catalog_reader
160            .find_schema_secret_by_secret_id(db_name, SecretId::from(secret_ref.secret_id))
161            .unwrap();
162        let maybe_print_as = match secret_ref.get_ref_as().unwrap() {
163            RefAsType::Text => "",
164            RefAsType::File => " AS FILE",
165            RefAsType::Unspecified => "",
166        };
167        format!("SECRET {}.{}{}", schema_name, secret_name, maybe_print_as,)
168    };
169    let deref_secrets = params
170        .get_secret_refs()
171        .iter()
172        .map(|(k, v)| (k.clone(), print_secret_ref(v)));
173    let mut props = params.get_properties().clone();
174    props.extend(deref_secrets);
175    serde_json::to_string(&props).unwrap()
176}