risingwave_frontend/handler/
alter_source_props.rs1use risingwave_pb::catalog::connection::Info::ConnectionParams;
15
16use super::RwPgResponse;
17use crate::catalog::catalog_service::CatalogReadGuard;
18use crate::catalog::root_catalog::SchemaPath;
19use crate::error::{ErrorCode, Result};
20use crate::handler::{HandlerArgs, ObjectName, SqlOption, StatementType};
21use crate::session::SessionImpl;
22use crate::utils::resolve_connection_ref_and_secret_ref;
23use crate::{Binder, WithOptions};
24
25pub async fn handle_alter_table_connector_props(
26 handler_args: HandlerArgs,
27 table_name: ObjectName,
28 alter_props: Vec<SqlOption>,
29) -> Result<RwPgResponse> {
30 let session = handler_args.session;
31 let db_name = &session.database();
32 let (schema_name, real_table_name) =
33 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
34 let search_path = session.config().search_path();
35 let user_name = &session.user_name();
36 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
37
38 let source_id = {
39 let reader = session.env().catalog_reader().read_guard();
40 let (table, schema_name) =
41 reader.get_any_table_by_name(db_name, schema_path, &real_table_name)?;
42 let Some(associate_source_id) = table.associated_source_id else {
43 return Err(ErrorCode::InvalidInputSyntax(
44 "Only table with connector can use ALTER TABLE CONNECTOR syntax.".to_owned(),
45 )
46 .into());
47 };
48
49 session.check_privilege_for_drop_alter(schema_name, &**table)?;
50 let (source_catalog, _) =
51 reader.get_source_by_id(db_name, schema_path, &associate_source_id.table_id)?;
52
53 ensure_alter_props_not_set_by_connection(
54 &reader,
55 db_name,
56 source_catalog.connection_id,
57 &alter_props,
58 )?;
59
60 tracing::info!(
61 "handle_alter_table_connector_props: table_name: {}, table id: {}, source_id: {}",
62 real_table_name,
63 table.id,
64 associate_source_id.table_id
65 );
66
67 associate_source_id.table_id
68 };
69
70 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
71
72 Ok(RwPgResponse::empty_result(StatementType::ALTER_TABLE))
73}
74
75async fn handle_alter_source_props_inner(
76 session: &SessionImpl,
77 alter_props: Vec<SqlOption>,
78 source_id: u32,
79) -> Result<()> {
80 let meta_client = session.env().meta_client();
81 let (resolved_with_options, _, connector_conn_ref) = resolve_connection_ref_and_secret_ref(
82 WithOptions::try_from(alter_props.as_ref() as &[SqlOption])?,
83 session,
84 None,
85 )?;
86 let (changed_props, changed_secret_refs) = resolved_with_options.into_parts();
87 if connector_conn_ref.is_some() {
88 return Err(ErrorCode::InvalidInputSyntax(
89 "ALTER SOURCE CONNECTOR does not support CONNECTION".to_owned(),
90 )
91 .into());
92 }
93
94 meta_client
95 .alter_source_connector_props(
96 source_id,
97 changed_props,
98 changed_secret_refs,
99 connector_conn_ref, )
101 .await?;
102 Ok(())
103}
104
105pub async fn handle_alter_source_connector_props(
106 handler_args: HandlerArgs,
107 source_name: ObjectName,
108 alter_props: Vec<SqlOption>,
109) -> Result<RwPgResponse> {
110 let session = handler_args.session;
111 let db_name = &session.database();
112 let (schema_name, real_source_name) =
113 Binder::resolve_schema_qualified_name(db_name, source_name.clone())?;
114 let search_path = session.config().search_path();
115 let user_name = &session.user_name();
116 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
117
118 let source_id = {
119 let reader = session.env().catalog_reader().read_guard();
120 let (source, schema_name) =
121 reader.get_source_by_name(db_name, schema_path, &real_source_name)?;
122
123 if source.associated_table_id.is_some() {
125 return Err(ErrorCode::InvalidInputSyntax(
126 "Use `ALTER TABLE` to alter a table with connector.".to_owned(),
127 )
128 .into());
129 }
130
131 session.check_privilege_for_drop_alter(schema_name, &**source)?;
132
133 ensure_alter_props_not_set_by_connection(
134 &reader,
135 db_name,
136 source.connection_id,
137 &alter_props,
138 )?;
139
140 source.id
141 };
142
143 handle_alter_source_props_inner(&session, alter_props, source_id).await?;
144
145 Ok(RwPgResponse::empty_result(StatementType::ALTER_SOURCE))
146}
147
148pub(crate) fn ensure_alter_props_not_set_by_connection(
150 reader: &CatalogReadGuard,
151 db_name: &str,
152 connection_id: Option<u32>,
153 alter_props: &[SqlOption],
154) -> Result<()> {
155 if let Some(conn_id) = connection_id {
156 let conn = reader.get_connection_by_id(db_name, conn_id)?;
157 if let ConnectionParams(params) = &conn.info {
158 for prop in alter_props {
159 let prop_key = prop.name.real_value();
160 if params.properties.contains_key(&prop_key)
161 || params.secret_refs.contains_key(&prop_key)
162 {
163 return Err(ErrorCode::InvalidInputSyntax(
164 "Cannot alter connector properties that are set by CONNECTION..".to_owned(),
165 )
166 .into());
167 }
168 }
169 }
170 }
171 Ok(())
172}