risingwave_frontend/handler/
alter_table_column.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::{bail, bail_not_implemented};
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{Source, Table};
25use risingwave_pb::ddl_service::TableJobType;
26use risingwave_pb::stream_plan::stream_node::PbNodeBody;
27use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
28use risingwave_sqlparser::ast::{
29 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
30};
31
32use super::create_source::SqlColumnStrategy;
33use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
34use super::{HandlerArgs, RwPgResponse};
35use crate::catalog::purify::try_purify_table_source_create_sql_ast;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::expr::{Expr, ExprImpl, InputRef, Literal};
40use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
41use crate::session::SessionImpl;
42use crate::{Binder, TableCatalog};
43
44pub async fn get_new_table_definition_for_cdc_table(
46 session: &Arc<SessionImpl>,
47 table_name: ObjectName,
48 new_columns: &[ColumnCatalog],
49) -> Result<(Statement, Arc<TableCatalog>)> {
50 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
51
52 assert_eq!(
53 original_catalog.row_id_index, None,
54 "primary key of cdc table must be user defined"
55 );
56
57 let mut definition = original_catalog.create_sql_ast()?;
59
60 {
63 let Statement::CreateTable {
64 columns,
65 constraints,
66 ..
67 } = &mut definition
68 else {
69 panic!("unexpected statement: {:?}", definition);
70 };
71
72 columns.clear();
73 constraints.clear();
74 }
75
76 let new_definition = try_purify_table_source_create_sql_ast(
77 definition,
78 new_columns,
79 None,
80 &original_catalog.pk_column_names(),
83 )?;
84
85 Ok((new_definition, original_catalog))
86}
87
88pub async fn get_replace_table_plan(
89 session: &Arc<SessionImpl>,
90 table_name: ObjectName,
91 new_definition: Statement,
92 old_catalog: &Arc<TableCatalog>,
93 sql_column_strategy: SqlColumnStrategy,
94) -> Result<(Option<Source>, Table, StreamFragmentGraph, TableJobType)> {
95 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
97 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
98
99 let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
100 session,
101 table_name,
102 old_catalog,
103 handler_args.clone(),
104 new_definition,
105 col_id_gen,
106 sql_column_strategy,
107 )
108 .await?;
109
110 let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
111
112 let target_columns = table
113 .columns
114 .iter()
115 .map(|col| ColumnCatalog::from(col.clone()))
116 .filter(|col| !col.is_rw_timestamp_column())
117 .collect_vec();
118
119 for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
120 hijack_merger_for_target_table(
121 &mut graph,
122 &target_columns,
123 &sink,
124 Some(&sink.unique_identity()),
125 )?;
126 }
127
128 let mut table = table;
130 table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
131 table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
132
133 Ok((source, table, graph, job_type))
134}
135
136pub(crate) fn hijack_merger_for_target_table(
137 graph: &mut StreamFragmentGraph,
138 target_columns: &[ColumnCatalog],
139 sink: &SinkCatalog,
140 uniq_identify: Option<&str>,
141) -> Result<()> {
142 let mut sink_columns = sink.original_target_columns.clone();
143 if sink_columns.is_empty() {
144 sink_columns = target_columns.to_vec();
149 }
150
151 let mut i = 0;
152 let mut j = 0;
153 let mut exprs = Vec::new();
154
155 while j < target_columns.len() {
156 if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
157 exprs.push(ExprImpl::InputRef(Box::new(InputRef {
158 data_type: sink_columns[i].data_type().clone(),
159 index: i,
160 })));
161
162 i += 1;
163 j += 1;
164 } else {
165 exprs.push(ExprImpl::Literal(Box::new(Literal::new(
166 None,
167 target_columns[j].data_type().clone(),
168 ))));
169
170 j += 1;
171 }
172 }
173
174 let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
175 select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
176 ..Default::default()
177 }));
178
179 for fragment in graph.fragments.values_mut() {
180 if let Some(node) = &mut fragment.node {
181 insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
182 }
183 }
184
185 Ok(())
186}
187
188pub async fn handle_alter_table_column(
191 handler_args: HandlerArgs,
192 table_name: ObjectName,
193 operation: AlterTableOperation,
194) -> Result<RwPgResponse> {
195 let session = handler_args.session;
196 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
197
198 if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
199 return Err(RwError::from(ErrorCode::BindError(
200 "Alter a table with incoming sink and generated column has not been implemented."
201 .to_owned(),
202 )));
203 }
204
205 if original_catalog.webhook_info.is_some() {
206 return Err(RwError::from(ErrorCode::BindError(
207 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
208 )));
209 }
210
211 let mut definition = original_catalog.create_sql_ast_purified()?;
213 let Statement::CreateTable { columns, .. } = &mut definition else {
214 panic!("unexpected statement: {:?}", definition);
215 };
216
217 if !original_catalog.incoming_sinks.is_empty()
218 && matches!(operation, AlterTableOperation::DropColumn { .. })
219 {
220 return Err(ErrorCode::InvalidInputSyntax(
221 "dropping columns in target table of sinks is not supported".to_owned(),
222 ))?;
223 }
224
225 let sql_column_strategy = match operation {
243 AlterTableOperation::AddColumn {
244 column_def: new_column,
245 } => {
246 let new_column_name = new_column.name.real_value();
249 if columns
250 .iter()
251 .any(|c| c.name.real_value() == new_column_name)
252 {
253 Err(ErrorCode::InvalidInputSyntax(format!(
254 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
255 )))?
256 }
257
258 if new_column
259 .options
260 .iter()
261 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
262 {
263 Err(ErrorCode::InvalidInputSyntax(
264 "alter table add generated columns is not supported".to_owned(),
265 ))?
266 }
267
268 if new_column
269 .options
270 .iter()
271 .any(|x| matches!(x.option, ColumnOption::NotNull))
272 && !new_column
273 .options
274 .iter()
275 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
276 {
277 return Err(ErrorCode::InvalidInputSyntax(
278 "alter table add NOT NULL columns must have default value".to_owned(),
279 ))?;
280 }
281
282 columns.push(new_column);
284
285 SqlColumnStrategy::FollowChecked
286 }
287
288 AlterTableOperation::DropColumn {
289 column_name,
290 if_exists,
291 cascade,
292 } => {
293 if cascade {
294 bail_not_implemented!(issue = 6903, "drop column cascade");
295 }
296
297 for column in original_catalog.columns() {
299 if let Some(expr) = column.generated_expr() {
300 let expr = ExprImpl::from_expr_proto(expr)?;
301 let refs = expr.collect_input_refs(original_catalog.columns().len());
302 for idx in refs.ones() {
303 let refed_column = &original_catalog.columns()[idx];
304 if refed_column.name() == column_name.real_value() {
305 bail!(format!(
306 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
307 column_name,
308 column.name()
309 ))
310 }
311 }
312 }
313 }
314
315 let column_name = column_name.real_value();
317 let removed_column = columns
318 .extract_if(.., |c| c.name.real_value() == column_name)
319 .at_most_one()
320 .ok()
321 .unwrap();
322
323 if removed_column.is_some() {
324 } else if if_exists {
326 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
327 .notice(format!(
328 "column \"{}\" does not exist, skipping",
329 column_name
330 ))
331 .into());
332 } else {
333 Err(ErrorCode::InvalidInputSyntax(format!(
334 "column \"{}\" of table \"{}\" does not exist",
335 column_name, table_name
336 )))?
337 }
338
339 SqlColumnStrategy::FollowUnchecked
340 }
341
342 AlterTableOperation::AlterColumn { column_name, op } => {
343 let AlterColumnOperation::SetDataType {
344 data_type,
345 using: None,
346 } = op
347 else {
348 bail_not_implemented!(issue = 6903, "{op}");
349 };
350
351 let column_name = column_name.real_value();
353 let column = columns
354 .iter_mut()
355 .find(|c| c.name.real_value() == column_name)
356 .ok_or_else(|| {
357 ErrorCode::InvalidInputSyntax(format!(
358 "column \"{}\" of table \"{}\" does not exist",
359 column_name, table_name
360 ))
361 })?;
362
363 column.data_type = Some(data_type);
364
365 SqlColumnStrategy::FollowChecked
366 }
367
368 _ => unreachable!(),
369 };
370 let (source, table, graph, job_type) = get_replace_table_plan(
371 &session,
372 table_name,
373 definition,
374 &original_catalog,
375 sql_column_strategy,
376 )
377 .await?;
378
379 let catalog_writer = session.catalog_writer()?;
380
381 catalog_writer
382 .replace_table(source, table, graph, job_type)
383 .await?;
384 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
385}
386
387pub fn fetch_table_catalog_for_alter(
388 session: &SessionImpl,
389 table_name: &ObjectName,
390) -> Result<Arc<TableCatalog>> {
391 let db_name = &session.database();
392 let (schema_name, real_table_name) =
393 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
394 let search_path = session.config().search_path();
395 let user_name = &session.user_name();
396
397 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
398
399 let original_catalog = {
400 let reader = session.env().catalog_reader().read_guard();
401 let (table, schema_name) =
402 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
403
404 match table.table_type() {
405 TableType::Table => {}
406
407 _ => Err(ErrorCode::InvalidInputSyntax(format!(
408 "\"{table_name}\" is not a table or cannot be altered"
409 )))?,
410 }
411
412 session.check_privilege_for_drop_alter(schema_name, &**table)?;
413
414 table.clone()
415 };
416
417 Ok(original_catalog)
418}
419
420#[cfg(test)]
421mod tests {
422 use std::collections::HashMap;
423
424 use risingwave_common::catalog::{
425 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
426 };
427 use risingwave_common::types::DataType;
428
429 use crate::catalog::root_catalog::SchemaPath;
430 use crate::test_utils::LocalFrontend;
431
432 #[tokio::test]
433 async fn test_add_column_handler() {
434 let frontend = LocalFrontend::new(Default::default()).await;
435 let session = frontend.session_ref();
436 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
437
438 let sql = "create table t (i int, r real);";
439 frontend.run_sql(sql).await.unwrap();
440
441 let get_table = || {
442 let catalog_reader = session.env().catalog_reader().read_guard();
443 catalog_reader
444 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
445 .unwrap()
446 .0
447 .clone()
448 };
449
450 let table = get_table();
451
452 let columns: HashMap<_, _> = table
453 .columns
454 .iter()
455 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
456 .collect();
457
458 let sql = "alter table t add column s text;";
460 frontend.run_sql(sql).await.unwrap();
461
462 let altered_table = get_table();
463
464 let altered_columns: HashMap<_, _> = altered_table
465 .columns
466 .iter()
467 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
468 .collect();
469
470 assert_eq!(columns.len() + 1, altered_columns.len());
472 assert_eq!(altered_columns["s"].0, DataType::Varchar);
473
474 assert_eq!(columns["i"], altered_columns["i"]);
476 assert_eq!(columns["r"], altered_columns["r"]);
477 assert_eq!(
478 columns[ROW_ID_COLUMN_NAME],
479 altered_columns[ROW_ID_COLUMN_NAME]
480 );
481
482 assert_eq!(
484 table.version.as_ref().unwrap().version_id + 1,
485 altered_table.version.as_ref().unwrap().version_id
486 );
487 assert_eq!(
488 table.version.as_ref().unwrap().next_column_id.next(),
489 altered_table.version.as_ref().unwrap().next_column_id
490 );
491 }
492}