risingwave_frontend/handler/
create_connection.rs1use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
19use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
20use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
21use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
22use risingwave_pb::catalog::connection_params::ConnectionType;
23use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
24use risingwave_pb::ddl_service::create_connection_request;
25use risingwave_pb::secret::SecretRef;
26use risingwave_sqlparser::ast::CreateConnectionStatement;
27
28use super::RwPgResponse;
29use crate::WithOptions;
30use crate::binder::Binder;
31use crate::catalog::SecretId;
32use crate::catalog::schema_catalog::SchemaCatalog;
33use crate::error::ErrorCode::ProtocolError;
34use crate::error::{ErrorCode, Result, RwError};
35use crate::handler::HandlerArgs;
36use crate::session::SessionImpl;
37use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
38
39pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
40
41#[inline(always)]
42fn get_connection_property_required(
43 with_properties: &mut BTreeMap<String, String>,
44 property: &str,
45) -> Result<String> {
46 with_properties.remove(property).ok_or_else(|| {
47 RwError::from(ProtocolError(format!(
48 "Required property \"{property}\" is not provided"
49 )))
50 })
51}
52fn resolve_create_connection_payload(
53 with_properties: WithOptions,
54 session: &SessionImpl,
55) -> Result<create_connection_request::Payload> {
56 if !with_properties.connection_ref().is_empty() {
57 return Err(RwError::from(ErrorCode::InvalidParameterValue(
58 "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
59 )));
60 }
61
62 let (mut props, secret_refs) =
63 resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
64 let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
65 let connection_type = match connection_type.as_str() {
66 PRIVATELINK_CONNECTION => {
67 return Err(RwError::from(ErrorCode::Deprecated(
68 "CREATE CONNECTION to Private Link".to_owned(),
69 "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
70 )));
71 }
72 KAFKA_CONNECTOR => ConnectionType::Kafka,
73 ICEBERG_CONNECTOR => ConnectionType::Iceberg,
74 SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
75 ES_SINK => ConnectionType::Elasticsearch,
76 _ => {
77 return Err(RwError::from(ProtocolError(format!(
78 "Connection type \"{connection_type}\" is not supported"
79 ))));
80 }
81 };
82 Ok(create_connection_request::Payload::ConnectionParams(
83 ConnectionParams {
84 connection_type: connection_type as i32,
85 properties: props.into_iter().collect(),
86 secret_refs: secret_refs.into_iter().collect(),
87 },
88 ))
89}
90
91pub async fn handle_create_connection(
92 handler_args: HandlerArgs,
93 stmt: CreateConnectionStatement,
94) -> Result<RwPgResponse> {
95 let session = handler_args.session.clone();
96 let db_name = &session.database();
97 let (schema_name, connection_name) =
98 Binder::resolve_schema_qualified_name(db_name, stmt.connection_name.clone())?;
99
100 if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
101 return if stmt.if_not_exists {
102 Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
103 .notice(format!(
104 "connection \"{}\" exists, skipping",
105 connection_name
106 ))
107 .into())
108 } else {
109 Err(e)
110 };
111 }
112 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
113 let mut with_properties = handler_args.with_options.clone().into_connector_props();
114 resolve_privatelink_in_with_option(&mut with_properties)?;
115 let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
116
117 let catalog_writer = session.catalog_writer()?;
118 catalog_writer
119 .create_connection(
120 connection_name,
121 database_id,
122 schema_id,
123 session.user_id(),
124 create_connection_payload,
125 )
126 .await?;
127
128 Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
129}
130
131pub fn print_connection_params(params: &PbConnectionParams, schema: &SchemaCatalog) -> String {
132 let print_secret_ref = |secret_ref: &SecretRef| -> String {
133 let secret_name = schema
134 .get_secret_by_id(&SecretId::from(secret_ref.secret_id))
135 .map(|s| s.name.as_str())
136 .unwrap();
137 format!(
138 "SECRET {} AS {}",
139 secret_name,
140 secret_ref.get_ref_as().unwrap().as_str_name()
141 )
142 };
143 let deref_secrets = params
144 .get_secret_refs()
145 .iter()
146 .map(|(k, v)| (k.clone(), print_secret_ref(v)));
147 let mut props = params.get_properties().clone();
148 props.extend(deref_secrets);
149 serde_json::to_string(&props).unwrap()
150}