risingwave_frontend/handler/
create_connection.rs1use std::collections::BTreeMap;
16
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::system_param::reader::SystemParamsRead;
19use risingwave_connector::connector_common::SCHEMA_REGISTRY_CONNECTION_TYPE;
20use risingwave_connector::sink::elasticsearch_opensearch::elasticsearch::ES_SINK;
21use risingwave_connector::source::enforce_secret_connection;
22use risingwave_connector::source::iceberg::ICEBERG_CONNECTOR;
23use risingwave_connector::source::kafka::{KAFKA_CONNECTOR, PRIVATELINK_CONNECTION};
24use risingwave_pb::catalog::connection_params::ConnectionType;
25use risingwave_pb::catalog::{ConnectionParams, PbConnectionParams};
26use risingwave_pb::ddl_service::create_connection_request;
27use risingwave_pb::secret::SecretRef;
28use risingwave_pb::secret::secret_ref::RefAsType;
29use risingwave_sqlparser::ast::CreateConnectionStatement;
30
31use super::RwPgResponse;
32use crate::WithOptions;
33use crate::binder::Binder;
34use crate::catalog::SecretId;
35use crate::catalog::catalog_service::CatalogReadGuard;
36use crate::error::ErrorCode::ProtocolError;
37use crate::error::{ErrorCode, Result, RwError};
38use crate::handler::HandlerArgs;
39use crate::session::SessionImpl;
40use crate::user::user_catalog::UserCatalog;
41use crate::user::{has_access_to_object, has_schema_usage_privilege};
42use crate::utils::{resolve_privatelink_in_with_option, resolve_secret_ref_in_with_options};
43
44pub(crate) const CONNECTION_TYPE_PROP: &str = "type";
45
46#[inline(always)]
47fn get_connection_property_required(
48 with_properties: &mut BTreeMap<String, String>,
49 property: &str,
50) -> Result<String> {
51 with_properties.remove(property).ok_or_else(|| {
52 RwError::from(ProtocolError(format!(
53 "Required property \"{property}\" is not provided"
54 )))
55 })
56}
57fn resolve_create_connection_payload(
58 with_properties: WithOptions,
59 session: &SessionImpl,
60) -> Result<create_connection_request::Payload> {
61 if !with_properties.connection_ref().is_empty() {
62 return Err(RwError::from(ErrorCode::InvalidParameterValue(
63 "Connection reference is not allowed in options in CREATE CONNECTION".to_owned(),
64 )));
65 }
66
67 let (mut props, secret_refs) =
68 resolve_secret_ref_in_with_options(with_properties, session)?.into_parts();
69 let connection_type = get_connection_property_required(&mut props, CONNECTION_TYPE_PROP)?;
70 let connection_type = match connection_type.as_str() {
71 PRIVATELINK_CONNECTION => {
72 return Err(RwError::from(ErrorCode::Deprecated(
73 "CREATE CONNECTION to Private Link".to_owned(),
74 "RisingWave Cloud Portal (Please refer to the doc https://docs.risingwave.com/cloud/create-a-connection/)".to_owned(),
75 )));
76 }
77 KAFKA_CONNECTOR => ConnectionType::Kafka,
78 ICEBERG_CONNECTOR => ConnectionType::Iceberg,
79 SCHEMA_REGISTRY_CONNECTION_TYPE => ConnectionType::SchemaRegistry,
80 ES_SINK => ConnectionType::Elasticsearch,
81 _ => {
82 return Err(RwError::from(ProtocolError(format!(
83 "Connection type \"{connection_type}\" is not supported"
84 ))));
85 }
86 };
87 Ok(create_connection_request::Payload::ConnectionParams(
88 ConnectionParams {
89 connection_type: connection_type as i32,
90 properties: props.into_iter().collect(),
91 secret_refs: secret_refs.into_iter().collect(),
92 },
93 ))
94}
95
96pub async fn handle_create_connection(
97 handler_args: HandlerArgs,
98 stmt: CreateConnectionStatement,
99) -> Result<RwPgResponse> {
100 let session = handler_args.session.clone();
101 let db_name = &session.database();
102 let (schema_name, connection_name) =
103 Binder::resolve_schema_qualified_name(db_name, &stmt.connection_name)?;
104
105 if let Err(e) = session.check_connection_name_duplicated(stmt.connection_name) {
106 return if stmt.if_not_exists {
107 Ok(PgResponse::builder(StatementType::CREATE_CONNECTION)
108 .notice(format!(
109 "connection \"{}\" exists, skipping",
110 connection_name
111 ))
112 .into())
113 } else {
114 Err(e)
115 };
116 }
117 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
118 let mut with_properties = handler_args.with_options.clone().into_connector_props();
119 resolve_privatelink_in_with_option(&mut with_properties)?;
120 let create_connection_payload = resolve_create_connection_payload(with_properties, &session)?;
121
122 let catalog_writer = session.catalog_writer()?;
123
124 if session
125 .env()
126 .system_params_manager()
127 .get_params()
128 .load()
129 .enforce_secret()
130 {
131 use risingwave_pb::ddl_service::create_connection_request::Payload::ConnectionParams;
132 let ConnectionParams(cp) = &create_connection_payload else {
133 unreachable!()
134 };
135 enforce_secret_connection(
136 &cp.connection_type(),
137 cp.properties.keys().map(|s| s.as_str()),
138 )?;
139 }
140
141 catalog_writer
142 .create_connection(
143 connection_name,
144 database_id,
145 schema_id,
146 session.user_id(),
147 create_connection_payload,
148 )
149 .await?;
150
151 Ok(PgResponse::empty_result(StatementType::CREATE_CONNECTION))
152}
153
154pub fn print_connection_params(
155 db_name: &str,
156 params: &PbConnectionParams,
157 catalog_reader: &CatalogReadGuard,
158) -> String {
159 print_connection_params_impl(db_name, params, catalog_reader, None)
160}
161
162pub fn print_connection_params_with_secret_visibility(
163 db_name: &str,
164 params: &PbConnectionParams,
165 catalog_reader: &CatalogReadGuard,
166 current_user: &UserCatalog,
167) -> String {
168 print_connection_params_impl(db_name, params, catalog_reader, Some(current_user))
169}
170
171fn print_connection_params_impl(
172 db_name: &str,
173 params: &PbConnectionParams,
174 catalog_reader: &CatalogReadGuard,
175 current_user: Option<&UserCatalog>,
176) -> String {
177 let print_secret_ref = |secret_ref: &SecretRef| -> String {
178 let maybe_print_as = match secret_ref.get_ref_as().unwrap() {
179 RefAsType::Text => "",
180 RefAsType::File => " AS FILE",
181 RefAsType::Unspecified => "",
182 };
183
184 let secret_id = SecretId::from(secret_ref.secret_id);
185 let (schema_name, schema_id, schema_owner, secret) = catalog_reader
186 .iter_schemas(db_name)
187 .unwrap()
188 .find_map(|schema| {
189 schema.get_secret_by_id(secret_id).map(|secret| {
190 (
191 schema.name.clone(),
192 schema.id(),
193 schema.owner,
194 secret.clone(),
195 )
196 })
197 })
198 .unwrap();
199
200 if let Some(current_user) = current_user {
201 let can_show_secret_ref = has_access_to_object(current_user, secret.id, secret.owner)
202 && has_schema_usage_privilege(current_user, schema_id, schema_owner);
203 if !can_show_secret_ref {
204 return format!("SECRET <redacted>{}", maybe_print_as);
205 }
206 }
207
208 let secret_name = secret.name.clone();
209 format!("SECRET {}.{}{}", schema_name, secret_name, maybe_print_as,)
210 };
211 let deref_secrets = params
212 .get_secret_refs()
213 .iter()
214 .map(|(k, v)| (k.clone(), print_secret_ref(v)));
215 let mut props = params.get_properties().clone();
216 props.extend(deref_secrets);
217 serde_json::to_string(&props).unwrap()
218}