risingwave_frontend/handler/
alter_table_column.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::catalog::{Source, Table};
27use risingwave_pb::ddl_service::TableJobType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
30use risingwave_sqlparser::ast::{
31 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
32};
33
34use super::create_source::SqlColumnStrategy;
35use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
36use super::{HandlerArgs, RwPgResponse};
37use crate::catalog::purify::try_purify_table_source_create_sql_ast;
38use crate::catalog::root_catalog::SchemaPath;
39use crate::catalog::table_catalog::TableType;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::expr::{Expr, ExprImpl, InputRef, Literal};
42use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
43use crate::session::SessionImpl;
44use crate::session::current::notice_to_user;
45use crate::{Binder, TableCatalog};
46
47pub async fn get_new_table_definition_for_cdc_table(
49 session: &Arc<SessionImpl>,
50 table_name: ObjectName,
51 new_columns: &[ColumnCatalog],
52) -> Result<(Statement, Arc<TableCatalog>)> {
53 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
54
55 assert_eq!(
56 original_catalog.row_id_index, None,
57 "primary key of cdc table must be user defined"
58 );
59
60 let mut definition = original_catalog.create_sql_ast()?;
62
63 {
66 let Statement::CreateTable {
67 columns,
68 constraints,
69 ..
70 } = &mut definition
71 else {
72 panic!("unexpected statement: {:?}", definition);
73 };
74
75 columns.clear();
76 constraints.clear();
77 }
78
79 let new_definition = try_purify_table_source_create_sql_ast(
80 definition,
81 new_columns,
82 None,
83 &original_catalog.pk_column_names(),
86 )?;
87
88 Ok((new_definition, original_catalog))
89}
90
91pub async fn get_replace_table_plan(
92 session: &Arc<SessionImpl>,
93 table_name: ObjectName,
94 new_definition: Statement,
95 old_catalog: &Arc<TableCatalog>,
96 sql_column_strategy: SqlColumnStrategy,
97) -> Result<(
98 Option<Source>,
99 Table,
100 StreamFragmentGraph,
101 ColIndexMapping,
102 TableJobType,
103)> {
104 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
106 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
107
108 let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
109 session,
110 table_name,
111 old_catalog,
112 handler_args.clone(),
113 new_definition,
114 col_id_gen,
115 sql_column_strategy,
116 )
117 .await?;
118
119 let col_index_mapping = ColIndexMapping::new(
123 old_catalog
124 .columns()
125 .iter()
126 .map(|old_c| {
127 table.columns.iter().position(|new_c| {
128 let new_c = new_c.get_column_desc().unwrap();
129
130 let id_matches = || new_c.column_id == old_c.column_id().get_id();
143 let type_matches = || {
144 let original_data_type = old_c.data_type();
145 let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap());
146 let matches = original_data_type == &new_data_type;
147 if !matches {
148 notice_to_user(format!("the data type of column \"{}\" has changed, treating as a new column", old_c.name()));
149 }
150 matches
151 };
152
153 id_matches() && type_matches()
154 })
155 })
156 .collect(),
157 table.columns.len(),
158 );
159
160 let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
161
162 let target_columns = table
163 .columns
164 .iter()
165 .map(|col| ColumnCatalog::from(col.clone()))
166 .filter(|col| !col.is_rw_timestamp_column())
167 .collect_vec();
168
169 for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
170 hijack_merger_for_target_table(
171 &mut graph,
172 &target_columns,
173 &sink,
174 Some(&sink.unique_identity()),
175 )?;
176 }
177
178 let mut table = table;
180 table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
181 table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
182
183 Ok((source, table, graph, col_index_mapping, job_type))
184}
185
186pub(crate) fn hijack_merger_for_target_table(
187 graph: &mut StreamFragmentGraph,
188 target_columns: &[ColumnCatalog],
189 sink: &SinkCatalog,
190 uniq_identify: Option<&str>,
191) -> Result<()> {
192 let mut sink_columns = sink.original_target_columns.clone();
193 if sink_columns.is_empty() {
194 sink_columns = target_columns.to_vec();
199 }
200
201 let mut i = 0;
202 let mut j = 0;
203 let mut exprs = Vec::new();
204
205 while j < target_columns.len() {
206 if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
207 exprs.push(ExprImpl::InputRef(Box::new(InputRef {
208 data_type: sink_columns[i].data_type().clone(),
209 index: i,
210 })));
211
212 i += 1;
213 j += 1;
214 } else {
215 exprs.push(ExprImpl::Literal(Box::new(Literal::new(
216 None,
217 target_columns[j].data_type().clone(),
218 ))));
219
220 j += 1;
221 }
222 }
223
224 let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
225 select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
226 ..Default::default()
227 }));
228
229 for fragment in graph.fragments.values_mut() {
230 if let Some(node) = &mut fragment.node {
231 insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
232 }
233 }
234
235 Ok(())
236}
237
238pub async fn handle_alter_table_column(
241 handler_args: HandlerArgs,
242 table_name: ObjectName,
243 operation: AlterTableOperation,
244) -> Result<RwPgResponse> {
245 let session = handler_args.session;
246 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
247
248 if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
249 return Err(RwError::from(ErrorCode::BindError(
250 "Alter a table with incoming sink and generated column has not been implemented."
251 .to_owned(),
252 )));
253 }
254
255 if original_catalog.webhook_info.is_some() {
256 return Err(RwError::from(ErrorCode::BindError(
257 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
258 )));
259 }
260
261 let mut definition = original_catalog.create_sql_ast_purified()?;
263 let Statement::CreateTable { columns, .. } = &mut definition else {
264 panic!("unexpected statement: {:?}", definition);
265 };
266
267 if !original_catalog.incoming_sinks.is_empty()
268 && matches!(operation, AlterTableOperation::DropColumn { .. })
269 {
270 return Err(ErrorCode::InvalidInputSyntax(
271 "dropping columns in target table of sinks is not supported".to_owned(),
272 ))?;
273 }
274
275 let sql_column_strategy = match operation {
293 AlterTableOperation::AddColumn {
294 column_def: new_column,
295 } => {
296 let new_column_name = new_column.name.real_value();
299 if columns
300 .iter()
301 .any(|c| c.name.real_value() == new_column_name)
302 {
303 Err(ErrorCode::InvalidInputSyntax(format!(
304 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
305 )))?
306 }
307
308 if new_column
309 .options
310 .iter()
311 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
312 {
313 Err(ErrorCode::InvalidInputSyntax(
314 "alter table add generated columns is not supported".to_owned(),
315 ))?
316 }
317
318 if new_column
319 .options
320 .iter()
321 .any(|x| matches!(x.option, ColumnOption::NotNull))
322 && !new_column
323 .options
324 .iter()
325 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
326 {
327 return Err(ErrorCode::InvalidInputSyntax(
328 "alter table add NOT NULL columns must have default value".to_owned(),
329 ))?;
330 }
331
332 columns.push(new_column);
334
335 SqlColumnStrategy::FollowChecked
336 }
337
338 AlterTableOperation::DropColumn {
339 column_name,
340 if_exists,
341 cascade,
342 } => {
343 if cascade {
344 bail_not_implemented!(issue = 6903, "drop column cascade");
345 }
346
347 for column in original_catalog.columns() {
349 if let Some(expr) = column.generated_expr() {
350 let expr = ExprImpl::from_expr_proto(expr)?;
351 let refs = expr.collect_input_refs(original_catalog.columns().len());
352 for idx in refs.ones() {
353 let refed_column = &original_catalog.columns()[idx];
354 if refed_column.name() == column_name.real_value() {
355 bail!(format!(
356 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
357 column_name,
358 column.name()
359 ))
360 }
361 }
362 }
363 }
364
365 let column_name = column_name.real_value();
367 let removed_column = columns
368 .extract_if(.., |c| c.name.real_value() == column_name)
369 .at_most_one()
370 .ok()
371 .unwrap();
372
373 if removed_column.is_some() {
374 } else if if_exists {
376 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
377 .notice(format!(
378 "column \"{}\" does not exist, skipping",
379 column_name
380 ))
381 .into());
382 } else {
383 Err(ErrorCode::InvalidInputSyntax(format!(
384 "column \"{}\" of table \"{}\" does not exist",
385 column_name, table_name
386 )))?
387 }
388
389 SqlColumnStrategy::FollowUnchecked
390 }
391
392 AlterTableOperation::AlterColumn { column_name, op } => {
393 let AlterColumnOperation::SetDataType {
394 data_type,
395 using: None,
396 } = op
397 else {
398 bail_not_implemented!(issue = 6903, "{op}");
399 };
400
401 let column_name = column_name.real_value();
403 let column = columns
404 .iter_mut()
405 .find(|c| c.name.real_value() == column_name)
406 .ok_or_else(|| {
407 ErrorCode::InvalidInputSyntax(format!(
408 "column \"{}\" of table \"{}\" does not exist",
409 column_name, table_name
410 ))
411 })?;
412
413 column.data_type = Some(data_type);
414
415 SqlColumnStrategy::FollowChecked
416 }
417
418 _ => unreachable!(),
419 };
420 let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
421 &session,
422 table_name,
423 definition,
424 &original_catalog,
425 sql_column_strategy,
426 )
427 .await?;
428
429 let catalog_writer = session.catalog_writer()?;
430
431 catalog_writer
432 .replace_table(source, table, graph, col_index_mapping, job_type)
433 .await?;
434 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
435}
436
437pub fn fetch_table_catalog_for_alter(
438 session: &SessionImpl,
439 table_name: &ObjectName,
440) -> Result<Arc<TableCatalog>> {
441 let db_name = &session.database();
442 let (schema_name, real_table_name) =
443 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
444 let search_path = session.config().search_path();
445 let user_name = &session.user_name();
446
447 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
448
449 let original_catalog = {
450 let reader = session.env().catalog_reader().read_guard();
451 let (table, schema_name) =
452 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
453
454 match table.table_type() {
455 TableType::Table => {}
456
457 _ => Err(ErrorCode::InvalidInputSyntax(format!(
458 "\"{table_name}\" is not a table or cannot be altered"
459 )))?,
460 }
461
462 session.check_privilege_for_drop_alter(schema_name, &**table)?;
463
464 table.clone()
465 };
466
467 Ok(original_catalog)
468}
469
470#[cfg(test)]
471mod tests {
472 use std::collections::HashMap;
473
474 use risingwave_common::catalog::{
475 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
476 };
477 use risingwave_common::types::DataType;
478
479 use crate::catalog::root_catalog::SchemaPath;
480 use crate::test_utils::LocalFrontend;
481
482 #[tokio::test]
483 async fn test_add_column_handler() {
484 let frontend = LocalFrontend::new(Default::default()).await;
485 let session = frontend.session_ref();
486 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
487
488 let sql = "create table t (i int, r real);";
489 frontend.run_sql(sql).await.unwrap();
490
491 let get_table = || {
492 let catalog_reader = session.env().catalog_reader().read_guard();
493 catalog_reader
494 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
495 .unwrap()
496 .0
497 .clone()
498 };
499
500 let table = get_table();
501
502 let columns: HashMap<_, _> = table
503 .columns
504 .iter()
505 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
506 .collect();
507
508 let sql = "alter table t add column s text;";
510 frontend.run_sql(sql).await.unwrap();
511
512 let altered_table = get_table();
513
514 let altered_columns: HashMap<_, _> = altered_table
515 .columns
516 .iter()
517 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
518 .collect();
519
520 assert_eq!(columns.len() + 1, altered_columns.len());
522 assert_eq!(altered_columns["s"].0, DataType::Varchar);
523
524 assert_eq!(columns["i"], altered_columns["i"]);
526 assert_eq!(columns["r"], altered_columns["r"]);
527 assert_eq!(
528 columns[ROW_ID_COLUMN_NAME],
529 altered_columns[ROW_ID_COLUMN_NAME]
530 );
531
532 assert_eq!(
534 table.version.as_ref().unwrap().version_id + 1,
535 altered_table.version.as_ref().unwrap().version_id
536 );
537 assert_eq!(
538 table.version.as_ref().unwrap().next_column_id.next(),
539 altered_table.version.as_ref().unwrap().next_column_id
540 );
541 }
542}