risingwave_frontend/handler/
create_connection.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
19use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
20use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
21use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
22use risingwave_pb::catalog::connection_params::ConnectionType;
23use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
24use risingwave_pb::ddl_service::create_connection_request;
25use risingwave_pb::secret::SecretRef;
26use risingwave_sqlparser::ast::CreateConnectionStatement;
27
28use super::RwPgResponse;
29use crate::WithOptions;
30use crate::binder::Binder;
31use crate::catalog::SecretId;
32use crate::catalog::schema_catalog::SchemaCatalog;
33use crate::error::ErrorCode::ProtocolError;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::session::SessionImpl;
37use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
38
39pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
40
41#[inline(always)]
42fn get_connection_property_required(
43    with_properties: &mut BTreeMap<String, String>,
44    property: &str,
45) -> Result<String> {
46    with_properties.remove(property).ok_or_else(|| {
47        RwError::from(ProtocolError(format!(
48            "Required property \"{property}\" is not provided"
49        )))
50    })
51}
52fn resolve_create_connection_payload(
53    with_properties: WithOptions,
54    session: &SessionImpl,
55) -> Result<create_connection_request::Payload> {
56    if !with_properties.connection_ref().is_empty() {
57        return Err(RwError::from(ErrorCode::InvalidParameterValue(
58            "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
59        )));
60    }
61
62    let (mut props, secret_refs) =
63        resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
64    let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
65    let connection_type = match connection_type.as_str() {
66        PRIVATELINK_CONNECTION => {
67            return Err(RwError::from(ErrorCode::Deprecated(
68            "CREATE CONNECTION to Private Link".to_owned(),
69            "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
70        )));
71        }
72        KAFKA_CONNECTOR => ConnectionType::Kafka,
73        ICEBERG_CONNECTOR => ConnectionType::Iceberg,
74        SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
75        ES_SINK => ConnectionType::Elasticsearch,
76        _ => {
77            return Err(RwError::from(ProtocolError(format!(
78                "Connection type \"{connection_type}\" is not supported"
79            ))));
80        }
81    };
82    Ok(create_connection_request::Payload::ConnectionParams(
83        ConnectionParams {
84            connection_type: connection_type as i32,
85            properties: props.into_iter().collect(),
86            secret_refs: secret_refs.into_iter().collect(),
87        },
88    ))
89}
90
91pub async fn handle_create_connection(
92    handler_args: HandlerArgs,
93    stmt: CreateConnectionStatement,
94) -> Result<RwPgResponse> {
95    let session = handler_args.session.clone();
96    let db_name = &session.database();
97    let (schema_name, connection_name) =
98        Binder::resolve_schema_qualified_name(db_name, stmt.connection_name.clone())?;
99
100    if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
101        return if stmt.if_not_exists {
102            Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
103                .notice(format!(
104                    "connection \"{}\" exists, skipping",
105                    connection_name
106                ))
107                .into())
108        } else {
109            Err(e)
110        };
111    }
112    let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
113    let mut with_properties = handler_args.with_options.clone().into_connector_props();
114    resolve_privatelink_in_with_option(&mut with_properties)?;
115    let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
116
117    let catalog_writer = session.catalog_writer()?;
118    catalog_writer
119        .create_connection(
120            connection_name,
121            database_id,
122            schema_id,
123            session.user_id(),
124            create_connection_payload,
125        )
126        .await?;
127
128    Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
129}
130
131pub fn print_connection_params(params: &PbConnectionParams, schema: &SchemaCatalog) -> String {
132    let print_secret_ref = |secret_ref: &SecretRef| -> String {
133        let secret_name = schema
134            .get_secret_by_id(&SecretId::from(secret_ref.secret_id))
135            .map(|s| s.name.as_str())
136            .unwrap();
137        format!(
138            "SECRET {} AS {}",
139            secret_name,
140            secret_ref.get_ref_as().unwrap().as_str_name()
141        )
142    };
143    let deref_secrets = params
144        .get_secret_refs()
145        .iter()
146        .map(|(k, v)| (k.clone(), print_secret_ref(v)));
147    let mut props = params.get_properties().clone();
148    props.extend(deref_secrets);
149    serde_json::to_string(&props).unwrap()
150}