risingwave_frontend/handler/
create_connection.rs1use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::system_param::reader::SystemParamsRead;
19use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
20use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
21use risingwave_connector::source::enforce_secret_connection;
22use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
23use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
24use risingwave_pb::catalog::connection_params::ConnectionType;
25use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
26use risingwave_pb::ddl_service::create_connection_request;
27use risingwave_pb::secret::SecretRef;
28use risingwave_pb::secret::secret_ref::RefAsType;
29use risingwave_sqlparser::ast::CreateConnectionStatement;
30
31use super::RwPgResponse;
32use crate::WithOptions;
33use crate::binder::Binder;
34use crate::catalog::SecretId;
35use crate::catalog::catalog_service::CatalogReadGuard;
36use crate::error::ErrorCode::ProtocolError;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::handler::HandlerArgs;
39use crate::session::SessionImpl;
40use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
41
42pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
43
44#[inline(always)]
45fn get_connection_property_required(
46 with_properties: &mut BTreeMap<String, String>,
47 property: &str,
48) -> Result<String> {
49 with_properties.remove(property).ok_or_else(|| {
50 RwError::from(ProtocolError(format!(
51 "Required property \"{property}\" is not provided"
52 )))
53 })
54}
55fn resolve_create_connection_payload(
56 with_properties: WithOptions,
57 session: &SessionImpl,
58) -> Result<create_connection_request::Payload> {
59 if !with_properties.connection_ref().is_empty() {
60 return Err(RwError::from(ErrorCode::InvalidParameterValue(
61 "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
62 )));
63 }
64
65 let (mut props, secret_refs) =
66 resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
67 let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
68 let connection_type = match connection_type.as_str() {
69 PRIVATELINK_CONNECTION => {
70 return Err(RwError::from(ErrorCode::Deprecated(
71 "CREATE CONNECTION to Private Link".to_owned(),
72 "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
73 )));
74 }
75 KAFKA_CONNECTOR => ConnectionType::Kafka,
76 ICEBERG_CONNECTOR => ConnectionType::Iceberg,
77 SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
78 ES_SINK => ConnectionType::Elasticsearch,
79 _ => {
80 return Err(RwError::from(ProtocolError(format!(
81 "Connection type \"{connection_type}\" is not supported"
82 ))));
83 }
84 };
85 Ok(create_connection_request::Payload::ConnectionParams(
86 ConnectionParams {
87 connection_type: connection_type as i32,
88 properties: props.into_iter().collect(),
89 secret_refs: secret_refs.into_iter().collect(),
90 },
91 ))
92}
93
94pub async fn handle_create_connection(
95 handler_args: HandlerArgs,
96 stmt: CreateConnectionStatement,
97) -> Result<RwPgResponse> {
98 let session = handler_args.session.clone();
99 let db_name = &session.database();
100 let (schema_name, connection_name) =
101 Binder::resolve_schema_qualified_name(db_name, &stmt.connection_name)?;
102
103 if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
104 return if stmt.if_not_exists {
105 Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
106 .notice(format!(
107 "connection \"{}\" exists, skipping",
108 connection_name
109 ))
110 .into())
111 } else {
112 Err(e)
113 };
114 }
115 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
116 let mut with_properties = handler_args.with_options.clone().into_connector_props();
117 resolve_privatelink_in_with_option(&mut with_properties)?;
118 let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
119
120 let catalog_writer = session.catalog_writer()?;
121
122 if session
123 .env()
124 .system_params_manager()
125 .get_params()
126 .load()
127 .enforce_secret()
128 {
129 use risingwave_pb::ddl_service::create_connection_request::Payload::ConnectionParams;
130 let ConnectionParams(cp) = &create_connection_payload else {
131 unreachable!()
132 };
133 enforce_secret_connection(
134 &cp.connection_type(),
135 cp.properties.keys().map(|s| s.as_str()),
136 )?;
137 }
138
139 catalog_writer
140 .create_connection(
141 connection_name,
142 database_id,
143 schema_id,
144 session.user_id(),
145 create_connection_payload,
146 )
147 .await?;
148
149 Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
150}
151
152pub fn print_connection_params(
153 db_name: &str,
154 params: &PbConnectionParams,
155 catalog_reader: &CatalogReadGuard,
156) -> String {
157 let print_secret_ref = |secret_ref: &SecretRef| -> String {
158 let (schema_name, secret_name) = catalog_reader
160 .find_schema_secret_by_secret_id(db_name, SecretId::from(secret_ref.secret_id))
161 .unwrap();
162 let maybe_print_as = match secret_ref.get_ref_as().unwrap() {
163 RefAsType::Text => "",
164 RefAsType::File => " AS FILE",
165 RefAsType::Unspecified => "",
166 };
167 format!("SECRET {}.{}{}", schema_name, secret_name, maybe_print_as,)
168 };
169 let deref_secrets = params
170 .get_secret_refs()
171 .iter()
172 .map(|(k, v)| (k.clone(), print_secret_ref(v)));
173 let mut props = params.get_properties().clone();
174 props.extend(deref_secrets);
175 serde_json::to_string(&props).unwrap()
176}