risingwave_frontend/handler/
create_connection.rs1use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::system_param::reader::SystemParamsRead;
19use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
20use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
21use risingwave_connector::source::enforce_secret_connection;
22use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
23use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
24use risingwave_pb::catalog::connection_params::ConnectionType;
25use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
26use risingwave_pb::ddl_service::create_connection_request;
27use risingwave_pb::secret::SecretRef;
28use risingwave_sqlparser::ast::CreateConnectionStatement;
29
30use super::RwPgResponse;
31use crate::WithOptions;
32use crate::binder::Binder;
33use crate::catalog::SecretId;
34use crate::catalog::schema_catalog::SchemaCatalog;
35use crate::error::ErrorCode::ProtocolError;
36use crate::error::{ErrorCode, Result, RwError};
37use crate::handler::HandlerArgs;
38use crate::session::SessionImpl;
39use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
40
41pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
42
43#[inline(always)]
44fn get_connection_property_required(
45 with_properties: &mut BTreeMap<String, String>,
46 property: &str,
47) -> Result<String> {
48 with_properties.remove(property).ok_or_else(|| {
49 RwError::from(ProtocolError(format!(
50 "Required property \"{property}\" is not provided"
51 )))
52 })
53}
54fn resolve_create_connection_payload(
55 with_properties: WithOptions,
56 session: &SessionImpl,
57) -> Result<create_connection_request::Payload> {
58 if !with_properties.connection_ref().is_empty() {
59 return Err(RwError::from(ErrorCode::InvalidParameterValue(
60 "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
61 )));
62 }
63
64 let (mut props, secret_refs) =
65 resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
66 let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
67 let connection_type = match connection_type.as_str() {
68 PRIVATELINK_CONNECTION => {
69 return Err(RwError::from(ErrorCode::Deprecated(
70 "CREATE CONNECTION to Private Link".to_owned(),
71 "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
72 )));
73 }
74 KAFKA_CONNECTOR => ConnectionType::Kafka,
75 ICEBERG_CONNECTOR => ConnectionType::Iceberg,
76 SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
77 ES_SINK => ConnectionType::Elasticsearch,
78 _ => {
79 return Err(RwError::from(ProtocolError(format!(
80 "Connection type \"{connection_type}\" is not supported"
81 ))));
82 }
83 };
84 Ok(create_connection_request::Payload::ConnectionParams(
85 ConnectionParams {
86 connection_type: connection_type as i32,
87 properties: props.into_iter().collect(),
88 secret_refs: secret_refs.into_iter().collect(),
89 },
90 ))
91}
92
93pub async fn handle_create_connection(
94 handler_args: HandlerArgs,
95 stmt: CreateConnectionStatement,
96) -> Result<RwPgResponse> {
97 let session = handler_args.session.clone();
98 let db_name = &session.database();
99 let (schema_name, connection_name) =
100 Binder::resolve_schema_qualified_name(db_name, stmt.connection_name.clone())?;
101
102 if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
103 return if stmt.if_not_exists {
104 Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
105 .notice(format!(
106 "connection \"{}\" exists, skipping",
107 connection_name
108 ))
109 .into())
110 } else {
111 Err(e)
112 };
113 }
114 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
115 let mut with_properties = handler_args.with_options.clone().into_connector_props();
116 resolve_privatelink_in_with_option(&mut with_properties)?;
117 let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
118
119 let catalog_writer = session.catalog_writer()?;
120
121 if session
122 .env()
123 .system_params_manager()
124 .get_params()
125 .load()
126 .enforce_secret()
127 {
128 use risingwave_pb::ddl_service::create_connection_request::Payload::ConnectionParams;
129 let ConnectionParams(cp) = &create_connection_payload else {
130 unreachable!()
131 };
132 enforce_secret_connection(
133 &cp.connection_type(),
134 cp.properties.keys().map(|s| s.as_str()),
135 )?;
136 }
137
138 catalog_writer
139 .create_connection(
140 connection_name,
141 database_id,
142 schema_id,
143 session.user_id(),
144 create_connection_payload,
145 )
146 .await?;
147
148 Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
149}
150
151pub fn print_connection_params(params: &PbConnectionParams, schema: &SchemaCatalog) -> String {
152 let print_secret_ref = |secret_ref: &SecretRef| -> String {
153 let secret_name = schema
154 .get_secret_by_id(&SecretId::from(secret_ref.secret_id))
155 .map(|s| s.name.as_str())
156 .unwrap();
157 format!(
158 "SECRET {} AS {}",
159 secret_name,
160 secret_ref.get_ref_as().unwrap().as_str_name()
161 )
162 };
163 let deref_secrets = params
164 .get_secret_refs()
165 .iter()
166 .map(|(k, v)| (k.clone(), print_secret_ref(v)));
167 let mut props = params.get_properties().clone();
168 props.extend(deref_secrets);
169 serde_json::to_string(&props).unwrap()
170}