risingwave_frontend/handler/
alter_table_column.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::{bail, bail_not_implemented};
22use risingwave_pb::ddl_service::TableJobType;
23use risingwave_pb::stream_plan::StreamFragmentGraph;
24use risingwave_sqlparser::ast::{
25 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
26};
27
28use super::create_source::SqlColumnStrategy;
29use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
30use super::{HandlerArgs, RwPgResponse};
31use crate::catalog::purify::try_purify_table_source_create_sql_ast;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::table_catalog::TableType;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::expr::ExprImpl;
37use crate::session::SessionImpl;
38use crate::{Binder, TableCatalog};
39
40pub async fn get_new_table_definition_for_cdc_table(
42 session: &Arc<SessionImpl>,
43 table_name: ObjectName,
44 new_columns: &[ColumnCatalog],
45) -> Result<(Statement, Arc<TableCatalog>)> {
46 let (original_catalog, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
47
48 assert_eq!(
49 original_catalog.row_id_index, None,
50 "primary key of cdc table must be user defined"
51 );
52
53 let mut definition = original_catalog.create_sql_ast()?;
55
56 {
59 let Statement::CreateTable {
60 columns,
61 constraints,
62 ..
63 } = &mut definition
64 else {
65 panic!("unexpected statement: {:?}", definition);
66 };
67
68 columns.clear();
69 constraints.clear();
70 }
71
72 let new_definition = try_purify_table_source_create_sql_ast(
73 definition,
74 new_columns,
75 None,
76 &original_catalog.pk_column_names(),
79 )?;
80
81 Ok((new_definition, original_catalog))
82}
83
84pub async fn get_replace_table_plan(
85 session: &Arc<SessionImpl>,
86 table_name: ObjectName,
87 new_definition: Statement,
88 old_catalog: &Arc<TableCatalog>,
89 sql_column_strategy: SqlColumnStrategy,
90) -> Result<(
91 Option<SourceCatalog>,
92 TableCatalog,
93 StreamFragmentGraph,
94 TableJobType,
95)> {
96 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
98 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
99
100 let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
101 session,
102 table_name,
103 old_catalog,
104 handler_args.clone(),
105 new_definition,
106 col_id_gen,
107 sql_column_strategy,
108 )
109 .await?;
110
111 let mut table = table;
113 table.vnode_count = VnodeCount::set(old_catalog.vnode_count());
114
115 Ok((source, table, graph, job_type))
116}
117
118pub async fn handle_alter_table_column(
121 handler_args: HandlerArgs,
122 table_name: ObjectName,
123 operation: AlterTableOperation,
124) -> Result<RwPgResponse> {
125 let session = handler_args.session;
126 let (original_catalog, has_incoming_sinks) =
127 fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
128
129 if original_catalog.webhook_info.is_some() {
130 return Err(RwError::from(ErrorCode::BindError(
131 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
132 )));
133 }
134
135 let mut definition = original_catalog.create_sql_ast_purified()?;
137 let Statement::CreateTable { columns, .. } = &mut definition else {
138 panic!("unexpected statement: {:?}", definition);
139 };
140
141 if has_incoming_sinks && matches!(operation, AlterTableOperation::DropColumn { .. }) {
142 return Err(ErrorCode::InvalidInputSyntax(
143 "dropping columns in target table of sinks is not supported".to_owned(),
144 ))?;
145 }
146
147 let sql_column_strategy = match operation {
165 AlterTableOperation::AddColumn {
166 column_def: new_column,
167 } => {
168 let new_column_name = new_column.name.real_value();
171 if columns
172 .iter()
173 .any(|c| c.name.real_value() == new_column_name)
174 {
175 Err(ErrorCode::InvalidInputSyntax(format!(
176 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
177 )))?
178 }
179
180 if new_column
181 .options
182 .iter()
183 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
184 {
185 Err(ErrorCode::InvalidInputSyntax(
186 "alter table add generated columns is not supported".to_owned(),
187 ))?
188 }
189
190 if new_column
191 .options
192 .iter()
193 .any(|x| matches!(x.option, ColumnOption::NotNull))
194 && !new_column
195 .options
196 .iter()
197 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
198 {
199 return Err(ErrorCode::InvalidInputSyntax(
200 "alter table add NOT NULL columns must have default value".to_owned(),
201 ))?;
202 }
203
204 columns.push(new_column);
206
207 SqlColumnStrategy::FollowChecked
208 }
209
210 AlterTableOperation::DropColumn {
211 column_name,
212 if_exists,
213 cascade,
214 } => {
215 if cascade {
216 bail_not_implemented!(issue = 6903, "drop column cascade");
217 }
218
219 for column in original_catalog.columns() {
221 if let Some(expr) = column.generated_expr() {
222 let expr = ExprImpl::from_expr_proto(expr)?;
223 let refs = expr.collect_input_refs(original_catalog.columns().len());
224 for idx in refs.ones() {
225 let refed_column = &original_catalog.columns()[idx];
226 if refed_column.name() == column_name.real_value() {
227 bail!(format!(
228 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
229 column_name,
230 column.name()
231 ))
232 }
233 }
234 }
235 }
236
237 let column_name = column_name.real_value();
239 let removed_column = columns
240 .extract_if(.., |c| c.name.real_value() == column_name)
241 .at_most_one()
242 .ok()
243 .unwrap();
244
245 if removed_column.is_some() {
246 } else if if_exists {
248 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
249 .notice(format!(
250 "column \"{}\" does not exist, skipping",
251 column_name
252 ))
253 .into());
254 } else {
255 Err(ErrorCode::InvalidInputSyntax(format!(
256 "column \"{}\" of table \"{}\" does not exist",
257 column_name, table_name
258 )))?
259 }
260
261 SqlColumnStrategy::FollowUnchecked
262 }
263
264 AlterTableOperation::AlterColumn { column_name, op } => {
265 let AlterColumnOperation::SetDataType {
266 data_type,
267 using: None,
268 } = op
269 else {
270 bail_not_implemented!(issue = 6903, "{op}");
271 };
272
273 let column_name = column_name.real_value();
275 let column = columns
276 .iter_mut()
277 .find(|c| c.name.real_value() == column_name)
278 .ok_or_else(|| {
279 ErrorCode::InvalidInputSyntax(format!(
280 "column \"{}\" of table \"{}\" does not exist",
281 column_name, table_name
282 ))
283 })?;
284
285 column.data_type = Some(data_type);
286
287 SqlColumnStrategy::FollowChecked
288 }
289
290 _ => unreachable!(),
291 };
292 let (source, table, graph, job_type) = get_replace_table_plan(
293 &session,
294 table_name,
295 definition,
296 &original_catalog,
297 sql_column_strategy,
298 )
299 .await?;
300
301 let catalog_writer = session.catalog_writer()?;
302
303 catalog_writer
304 .replace_table(
305 source.map(|x| x.to_prost()),
306 table.to_prost(),
307 graph,
308 job_type,
309 )
310 .await?;
311 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
312}
313
314pub fn fetch_table_catalog_for_alter(
315 session: &SessionImpl,
316 table_name: &ObjectName,
317) -> Result<(Arc<TableCatalog>, bool)> {
318 let db_name = &session.database();
319 let (schema_name, real_table_name) =
320 Binder::resolve_schema_qualified_name(db_name, table_name)?;
321 let search_path = session.config().search_path();
322 let user_name = &session.user_name();
323
324 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
325
326 {
327 let reader = session.env().catalog_reader().read_guard();
328 let (table, schema_name) =
329 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
330
331 match table.table_type() {
332 TableType::Table => {}
333
334 _ => Err(ErrorCode::InvalidInputSyntax(format!(
335 "\"{table_name}\" is not a table or cannot be altered"
336 )))?,
337 }
338
339 session.check_privilege_for_drop_alter(schema_name, &**table)?;
340
341 let has_incoming_sinks = reader
342 .get_schema_by_id(&table.database_id, &table.schema_id)?
343 .table_incoming_sinks(table.id)
344 .map(|sinks| !sinks.is_empty())
345 .unwrap_or(false);
346
347 Ok((table.clone(), has_incoming_sinks))
348 }
349}
350
351#[cfg(test)]
352mod tests {
353 use std::collections::HashMap;
354
355 use risingwave_common::catalog::{
356 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
357 };
358 use risingwave_common::types::DataType;
359
360 use crate::catalog::root_catalog::SchemaPath;
361 use crate::test_utils::LocalFrontend;
362
363 #[tokio::test]
364 async fn test_add_column_handler() {
365 let frontend = LocalFrontend::new(Default::default()).await;
366 let session = frontend.session_ref();
367 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
368
369 let sql = "create table t (i int, r real);";
370 frontend.run_sql(sql).await.unwrap();
371
372 let get_table = || {
373 let catalog_reader = session.env().catalog_reader().read_guard();
374 catalog_reader
375 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
376 .unwrap()
377 .0
378 .clone()
379 };
380
381 let table = get_table();
382
383 let columns: HashMap<_, _> = table
384 .columns
385 .iter()
386 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
387 .collect();
388
389 let sql = "alter table t add column s text;";
391 frontend.run_sql(sql).await.unwrap();
392
393 let altered_table = get_table();
394
395 let altered_columns: HashMap<_, _> = altered_table
396 .columns
397 .iter()
398 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
399 .collect();
400
401 assert_eq!(columns.len() + 1, altered_columns.len());
403 assert_eq!(altered_columns["s"].0, DataType::Varchar);
404
405 assert_eq!(columns["i"], altered_columns["i"]);
407 assert_eq!(columns["r"], altered_columns["r"]);
408 assert_eq!(
409 columns[ROW_ID_COLUMN_NAME],
410 altered_columns[ROW_ID_COLUMN_NAME]
411 );
412
413 assert_eq!(
415 table.version.as_ref().unwrap().version_id + 1,
416 altered_table.version.as_ref().unwrap().version_id
417 );
418 assert_eq!(
419 table.version.as_ref().unwrap().next_column_id.next(),
420 altered_table.version.as_ref().unwrap().next_column_id
421 );
422 }
423}