risingwave_frontend/handler/
alter_table_column.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::catalog::{Source, Table};
27use risingwave_pb::ddl_service::TableJobType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
30use risingwave_sqlparser::ast::{
31 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
32};
33
34use super::create_source::SqlColumnStrategy;
35use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
36use super::{HandlerArgs, RwPgResponse};
37use crate::catalog::purify::try_purify_table_source_create_sql_ast;
38use crate::catalog::root_catalog::SchemaPath;
39use crate::catalog::table_catalog::TableType;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::expr::{Expr, ExprImpl, InputRef, Literal};
42use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
43use crate::session::SessionImpl;
44use crate::{Binder, TableCatalog};
45
46pub async fn get_new_table_definition_for_cdc_table(
48 session: &Arc<SessionImpl>,
49 table_name: ObjectName,
50 new_columns: &[ColumnCatalog],
51) -> Result<(Statement, Arc<TableCatalog>)> {
52 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
53
54 assert_eq!(
55 original_catalog.row_id_index, None,
56 "primary key of cdc table must be user defined"
57 );
58
59 let mut definition = original_catalog.create_sql_ast()?;
61
62 {
65 let Statement::CreateTable {
66 columns,
67 constraints,
68 ..
69 } = &mut definition
70 else {
71 panic!("unexpected statement: {:?}", definition);
72 };
73
74 columns.clear();
75 constraints.clear();
76 }
77
78 let new_definition = try_purify_table_source_create_sql_ast(
79 definition,
80 new_columns,
81 None,
82 &original_catalog.pk_column_names(),
85 )?;
86
87 Ok((new_definition, original_catalog))
88}
89
90pub async fn get_replace_table_plan(
91 session: &Arc<SessionImpl>,
92 table_name: ObjectName,
93 new_definition: Statement,
94 old_catalog: &Arc<TableCatalog>,
95 sql_column_strategy: SqlColumnStrategy,
96) -> Result<(
97 Option<Source>,
98 Table,
99 StreamFragmentGraph,
100 ColIndexMapping,
101 TableJobType,
102)> {
103 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
105 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
106
107 let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
108 session,
109 table_name,
110 old_catalog,
111 handler_args.clone(),
112 new_definition,
113 col_id_gen,
114 sql_column_strategy,
115 )
116 .await?;
117
118 let col_index_mapping = ColIndexMapping::new(
128 old_catalog
129 .columns()
130 .iter()
131 .map(|old_c| {
132 table.columns.iter().position(|new_c| {
133 let new_c = new_c.get_column_desc().unwrap();
134
135 let id_matches = || new_c.column_id == old_c.column_id().get_id();
143 let type_matches = || {
144 let original_data_type = old_c.data_type();
145 let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap());
146 original_data_type == &new_data_type
147 };
148
149 id_matches() && type_matches()
150 })
151 })
152 .collect(),
153 table.columns.len(),
154 );
155
156 let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
157
158 let target_columns = table
159 .columns
160 .iter()
161 .map(|col| ColumnCatalog::from(col.clone()))
162 .filter(|col| !col.is_rw_timestamp_column())
163 .collect_vec();
164
165 for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
166 hijack_merger_for_target_table(
167 &mut graph,
168 &target_columns,
169 &sink,
170 Some(&sink.unique_identity()),
171 )?;
172 }
173
174 let mut table = table;
176 table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
177 table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
178
179 Ok((source, table, graph, col_index_mapping, job_type))
180}
181
182pub(crate) fn hijack_merger_for_target_table(
183 graph: &mut StreamFragmentGraph,
184 target_columns: &[ColumnCatalog],
185 sink: &SinkCatalog,
186 uniq_identify: Option<&str>,
187) -> Result<()> {
188 let mut sink_columns = sink.original_target_columns.clone();
189 if sink_columns.is_empty() {
190 sink_columns = target_columns.to_vec();
195 }
196
197 let mut i = 0;
198 let mut j = 0;
199 let mut exprs = Vec::new();
200
201 while j < target_columns.len() {
202 if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
203 exprs.push(ExprImpl::InputRef(Box::new(InputRef {
204 data_type: sink_columns[i].data_type().clone(),
205 index: i,
206 })));
207
208 i += 1;
209 j += 1;
210 } else {
211 exprs.push(ExprImpl::Literal(Box::new(Literal::new(
212 None,
213 target_columns[j].data_type().clone(),
214 ))));
215
216 j += 1;
217 }
218 }
219
220 let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
221 select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
222 ..Default::default()
223 }));
224
225 for fragment in graph.fragments.values_mut() {
226 if let Some(node) = &mut fragment.node {
227 insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
228 }
229 }
230
231 Ok(())
232}
233
234pub async fn handle_alter_table_column(
237 handler_args: HandlerArgs,
238 table_name: ObjectName,
239 operation: AlterTableOperation,
240) -> Result<RwPgResponse> {
241 let session = handler_args.session;
242 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
243
244 if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
245 return Err(RwError::from(ErrorCode::BindError(
246 "Alter a table with incoming sink and generated column has not been implemented."
247 .to_owned(),
248 )));
249 }
250
251 if original_catalog.webhook_info.is_some() {
252 return Err(RwError::from(ErrorCode::BindError(
253 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
254 )));
255 }
256
257 let mut definition = original_catalog.create_sql_ast_purified()?;
259 let Statement::CreateTable { columns, .. } = &mut definition else {
260 panic!("unexpected statement: {:?}", definition);
261 };
262
263 if !original_catalog.incoming_sinks.is_empty()
264 && matches!(operation, AlterTableOperation::DropColumn { .. })
265 {
266 return Err(ErrorCode::InvalidInputSyntax(
267 "dropping columns in target table of sinks is not supported".to_owned(),
268 ))?;
269 }
270
271 let sql_column_strategy = match operation {
289 AlterTableOperation::AddColumn {
290 column_def: new_column,
291 } => {
292 let new_column_name = new_column.name.real_value();
295 if columns
296 .iter()
297 .any(|c| c.name.real_value() == new_column_name)
298 {
299 Err(ErrorCode::InvalidInputSyntax(format!(
300 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
301 )))?
302 }
303
304 if new_column
305 .options
306 .iter()
307 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
308 {
309 Err(ErrorCode::InvalidInputSyntax(
310 "alter table add generated columns is not supported".to_owned(),
311 ))?
312 }
313
314 if new_column
315 .options
316 .iter()
317 .any(|x| matches!(x.option, ColumnOption::NotNull))
318 && !new_column
319 .options
320 .iter()
321 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
322 {
323 return Err(ErrorCode::InvalidInputSyntax(
324 "alter table add NOT NULL columns must have default value".to_owned(),
325 ))?;
326 }
327
328 columns.push(new_column);
330
331 SqlColumnStrategy::FollowChecked
332 }
333
334 AlterTableOperation::DropColumn {
335 column_name,
336 if_exists,
337 cascade,
338 } => {
339 if cascade {
340 bail_not_implemented!(issue = 6903, "drop column cascade");
341 }
342
343 for column in original_catalog.columns() {
345 if let Some(expr) = column.generated_expr() {
346 let expr = ExprImpl::from_expr_proto(expr)?;
347 let refs = expr.collect_input_refs(original_catalog.columns().len());
348 for idx in refs.ones() {
349 let refed_column = &original_catalog.columns()[idx];
350 if refed_column.name() == column_name.real_value() {
351 bail!(format!(
352 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
353 column_name,
354 column.name()
355 ))
356 }
357 }
358 }
359 }
360
361 let column_name = column_name.real_value();
363 let removed_column = columns
364 .extract_if(.., |c| c.name.real_value() == column_name)
365 .at_most_one()
366 .ok()
367 .unwrap();
368
369 if removed_column.is_some() {
370 } else if if_exists {
372 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
373 .notice(format!(
374 "column \"{}\" does not exist, skipping",
375 column_name
376 ))
377 .into());
378 } else {
379 Err(ErrorCode::InvalidInputSyntax(format!(
380 "column \"{}\" of table \"{}\" does not exist",
381 column_name, table_name
382 )))?
383 }
384
385 SqlColumnStrategy::FollowUnchecked
386 }
387
388 AlterTableOperation::AlterColumn { column_name, op } => {
389 let AlterColumnOperation::SetDataType {
390 data_type,
391 using: None,
392 } = op
393 else {
394 bail_not_implemented!(issue = 6903, "{op}");
395 };
396
397 let column_name = column_name.real_value();
399 let column = columns
400 .iter_mut()
401 .find(|c| c.name.real_value() == column_name)
402 .ok_or_else(|| {
403 ErrorCode::InvalidInputSyntax(format!(
404 "column \"{}\" of table \"{}\" does not exist",
405 column_name, table_name
406 ))
407 })?;
408
409 column.data_type = Some(data_type);
410
411 SqlColumnStrategy::FollowChecked
412 }
413
414 _ => unreachable!(),
415 };
416 let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
417 &session,
418 table_name,
419 definition,
420 &original_catalog,
421 sql_column_strategy,
422 )
423 .await?;
424
425 let catalog_writer = session.catalog_writer()?;
426
427 catalog_writer
428 .replace_table(source, table, graph, col_index_mapping, job_type)
429 .await?;
430 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
431}
432
433pub fn fetch_table_catalog_for_alter(
434 session: &SessionImpl,
435 table_name: &ObjectName,
436) -> Result<Arc<TableCatalog>> {
437 let db_name = &session.database();
438 let (schema_name, real_table_name) =
439 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
440 let search_path = session.config().search_path();
441 let user_name = &session.user_name();
442
443 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
444
445 let original_catalog = {
446 let reader = session.env().catalog_reader().read_guard();
447 let (table, schema_name) =
448 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
449
450 match table.table_type() {
451 TableType::Table => {}
452
453 _ => Err(ErrorCode::InvalidInputSyntax(format!(
454 "\"{table_name}\" is not a table or cannot be altered"
455 )))?,
456 }
457
458 session.check_privilege_for_drop_alter(schema_name, &**table)?;
459
460 table.clone()
461 };
462
463 Ok(original_catalog)
464}
465
466#[cfg(test)]
467mod tests {
468 use std::collections::HashMap;
469
470 use risingwave_common::catalog::{
471 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
472 };
473 use risingwave_common::types::DataType;
474
475 use crate::catalog::root_catalog::SchemaPath;
476 use crate::test_utils::LocalFrontend;
477
478 #[tokio::test]
479 async fn test_add_column_handler() {
480 let frontend = LocalFrontend::new(Default::default()).await;
481 let session = frontend.session_ref();
482 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
483
484 let sql = "create table t (i int, r real);";
485 frontend.run_sql(sql).await.unwrap();
486
487 let get_table = || {
488 let catalog_reader = session.env().catalog_reader().read_guard();
489 catalog_reader
490 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
491 .unwrap()
492 .0
493 .clone()
494 };
495
496 let table = get_table();
497
498 let columns: HashMap<_, _> = table
499 .columns
500 .iter()
501 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
502 .collect();
503
504 let sql = "alter table t add column s text;";
506 frontend.run_sql(sql).await.unwrap();
507
508 let altered_table = get_table();
509
510 let altered_columns: HashMap<_, _> = altered_table
511 .columns
512 .iter()
513 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
514 .collect();
515
516 assert_eq!(columns.len() + 1, altered_columns.len());
518 assert_eq!(altered_columns["s"].0, DataType::Varchar);
519
520 assert_eq!(columns["i"], altered_columns["i"]);
522 assert_eq!(columns["r"], altered_columns["r"]);
523 assert_eq!(
524 columns[ROW_ID_COLUMN_NAME],
525 altered_columns[ROW_ID_COLUMN_NAME]
526 );
527
528 assert_eq!(
530 table.version.as_ref().unwrap().version_id + 1,
531 altered_table.version.as_ref().unwrap().version_id
532 );
533 assert_eq!(
534 table.version.as_ref().unwrap().next_column_id.next(),
535 altered_table.version.as_ref().unwrap().next_column_id
536 );
537 }
538}