risingwave_frontend/handler/
alter_table_column.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::{bail, bail_not_implemented};
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::ddl_service::TableJobType;
25use risingwave_pb::stream_plan::stream_node::PbNodeBody;
26use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
27use risingwave_sqlparser::ast::{
28 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
29};
30
31use super::create_source::SqlColumnStrategy;
32use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
33use super::{HandlerArgs, RwPgResponse};
34use crate::catalog::purify::try_purify_table_source_create_sql_ast;
35use crate::catalog::root_catalog::SchemaPath;
36use crate::catalog::source_catalog::SourceCatalog;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::expr::{Expr, ExprImpl, InputRef, Literal};
40use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
41use crate::session::SessionImpl;
42use crate::{Binder, TableCatalog};
43
44pub async fn get_new_table_definition_for_cdc_table(
46 session: &Arc<SessionImpl>,
47 table_name: ObjectName,
48 new_columns: &[ColumnCatalog],
49) -> Result<(Statement, Arc<TableCatalog>)> {
50 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
51
52 assert_eq!(
53 original_catalog.row_id_index, None,
54 "primary key of cdc table must be user defined"
55 );
56
57 let mut definition = original_catalog.create_sql_ast()?;
59
60 {
63 let Statement::CreateTable {
64 columns,
65 constraints,
66 ..
67 } = &mut definition
68 else {
69 panic!("unexpected statement: {:?}", definition);
70 };
71
72 columns.clear();
73 constraints.clear();
74 }
75
76 let new_definition = try_purify_table_source_create_sql_ast(
77 definition,
78 new_columns,
79 None,
80 &original_catalog.pk_column_names(),
83 )?;
84
85 Ok((new_definition, original_catalog))
86}
87
88pub async fn get_replace_table_plan(
89 session: &Arc<SessionImpl>,
90 table_name: ObjectName,
91 new_definition: Statement,
92 old_catalog: &Arc<TableCatalog>,
93 sql_column_strategy: SqlColumnStrategy,
94) -> Result<(
95 Option<SourceCatalog>,
96 TableCatalog,
97 StreamFragmentGraph,
98 TableJobType,
99)> {
100 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
102 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
103
104 let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
105 session,
106 table_name,
107 old_catalog,
108 handler_args.clone(),
109 new_definition,
110 col_id_gen,
111 sql_column_strategy,
112 )
113 .await?;
114
115 let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
116
117 let target_columns = (table.columns.iter())
118 .filter(|col| !col.is_rw_timestamp_column())
119 .cloned()
120 .collect_vec();
121
122 for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
123 hijack_merger_for_target_table(
124 &mut graph,
125 &target_columns,
126 &sink,
127 Some(&sink.unique_identity()),
128 )?;
129 }
130
131 let mut table = table;
133 table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
134 table.vnode_count = VnodeCount::set(old_catalog.vnode_count());
135
136 Ok((source, table, graph, job_type))
137}
138
139pub(crate) fn hijack_merger_for_target_table(
140 graph: &mut StreamFragmentGraph,
141 target_columns: &[ColumnCatalog],
142 sink: &SinkCatalog,
143 uniq_identify: Option<&str>,
144) -> Result<()> {
145 let mut sink_columns = sink.original_target_columns.clone();
146 if sink_columns.is_empty() {
147 sink_columns = target_columns.to_vec();
152 }
153
154 let mut i = 0;
155 let mut j = 0;
156 let mut exprs = Vec::new();
157
158 while j < target_columns.len() {
159 if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
160 exprs.push(ExprImpl::InputRef(Box::new(InputRef {
161 data_type: sink_columns[i].data_type().clone(),
162 index: i,
163 })));
164
165 i += 1;
166 j += 1;
167 } else {
168 exprs.push(ExprImpl::Literal(Box::new(Literal::new(
169 None,
170 target_columns[j].data_type().clone(),
171 ))));
172
173 j += 1;
174 }
175 }
176
177 let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
178 select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
179 ..Default::default()
180 }));
181
182 for fragment in graph.fragments.values_mut() {
183 if let Some(node) = &mut fragment.node {
184 insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
185 }
186 }
187
188 Ok(())
189}
190
191pub async fn handle_alter_table_column(
194 handler_args: HandlerArgs,
195 table_name: ObjectName,
196 operation: AlterTableOperation,
197) -> Result<RwPgResponse> {
198 let session = handler_args.session;
199 let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
200
201 if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
202 return Err(RwError::from(ErrorCode::BindError(
203 "Alter a table with incoming sink and generated column has not been implemented."
204 .to_owned(),
205 )));
206 }
207
208 if original_catalog.webhook_info.is_some() {
209 return Err(RwError::from(ErrorCode::BindError(
210 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
211 )));
212 }
213
214 let mut definition = original_catalog.create_sql_ast_purified()?;
216 let Statement::CreateTable { columns, .. } = &mut definition else {
217 panic!("unexpected statement: {:?}", definition);
218 };
219
220 if !original_catalog.incoming_sinks.is_empty()
221 && matches!(operation, AlterTableOperation::DropColumn { .. })
222 {
223 return Err(ErrorCode::InvalidInputSyntax(
224 "dropping columns in target table of sinks is not supported".to_owned(),
225 ))?;
226 }
227
228 let sql_column_strategy = match operation {
246 AlterTableOperation::AddColumn {
247 column_def: new_column,
248 } => {
249 let new_column_name = new_column.name.real_value();
252 if columns
253 .iter()
254 .any(|c| c.name.real_value() == new_column_name)
255 {
256 Err(ErrorCode::InvalidInputSyntax(format!(
257 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
258 )))?
259 }
260
261 if new_column
262 .options
263 .iter()
264 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
265 {
266 Err(ErrorCode::InvalidInputSyntax(
267 "alter table add generated columns is not supported".to_owned(),
268 ))?
269 }
270
271 if new_column
272 .options
273 .iter()
274 .any(|x| matches!(x.option, ColumnOption::NotNull))
275 && !new_column
276 .options
277 .iter()
278 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
279 {
280 return Err(ErrorCode::InvalidInputSyntax(
281 "alter table add NOT NULL columns must have default value".to_owned(),
282 ))?;
283 }
284
285 columns.push(new_column);
287
288 SqlColumnStrategy::FollowChecked
289 }
290
291 AlterTableOperation::DropColumn {
292 column_name,
293 if_exists,
294 cascade,
295 } => {
296 if cascade {
297 bail_not_implemented!(issue = 6903, "drop column cascade");
298 }
299
300 for column in original_catalog.columns() {
302 if let Some(expr) = column.generated_expr() {
303 let expr = ExprImpl::from_expr_proto(expr)?;
304 let refs = expr.collect_input_refs(original_catalog.columns().len());
305 for idx in refs.ones() {
306 let refed_column = &original_catalog.columns()[idx];
307 if refed_column.name() == column_name.real_value() {
308 bail!(format!(
309 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
310 column_name,
311 column.name()
312 ))
313 }
314 }
315 }
316 }
317
318 let column_name = column_name.real_value();
320 let removed_column = columns
321 .extract_if(.., |c| c.name.real_value() == column_name)
322 .at_most_one()
323 .ok()
324 .unwrap();
325
326 if removed_column.is_some() {
327 } else if if_exists {
329 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
330 .notice(format!(
331 "column \"{}\" does not exist, skipping",
332 column_name
333 ))
334 .into());
335 } else {
336 Err(ErrorCode::InvalidInputSyntax(format!(
337 "column \"{}\" of table \"{}\" does not exist",
338 column_name, table_name
339 )))?
340 }
341
342 SqlColumnStrategy::FollowUnchecked
343 }
344
345 AlterTableOperation::AlterColumn { column_name, op } => {
346 let AlterColumnOperation::SetDataType {
347 data_type,
348 using: None,
349 } = op
350 else {
351 bail_not_implemented!(issue = 6903, "{op}");
352 };
353
354 let column_name = column_name.real_value();
356 let column = columns
357 .iter_mut()
358 .find(|c| c.name.real_value() == column_name)
359 .ok_or_else(|| {
360 ErrorCode::InvalidInputSyntax(format!(
361 "column \"{}\" of table \"{}\" does not exist",
362 column_name, table_name
363 ))
364 })?;
365
366 column.data_type = Some(data_type);
367
368 SqlColumnStrategy::FollowChecked
369 }
370
371 _ => unreachable!(),
372 };
373 let (source, table, graph, job_type) = get_replace_table_plan(
374 &session,
375 table_name,
376 definition,
377 &original_catalog,
378 sql_column_strategy,
379 )
380 .await?;
381
382 let catalog_writer = session.catalog_writer()?;
383
384 catalog_writer
385 .replace_table(
386 source.map(|x| x.to_prost()),
387 table.to_prost(),
388 graph,
389 job_type,
390 )
391 .await?;
392 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
393}
394
395pub fn fetch_table_catalog_for_alter(
396 session: &SessionImpl,
397 table_name: &ObjectName,
398) -> Result<Arc<TableCatalog>> {
399 let db_name = &session.database();
400 let (schema_name, real_table_name) =
401 Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
402 let search_path = session.config().search_path();
403 let user_name = &session.user_name();
404
405 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
406
407 let original_catalog = {
408 let reader = session.env().catalog_reader().read_guard();
409 let (table, schema_name) =
410 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
411
412 match table.table_type() {
413 TableType::Table => {}
414
415 _ => Err(ErrorCode::InvalidInputSyntax(format!(
416 "\"{table_name}\" is not a table or cannot be altered"
417 )))?,
418 }
419
420 session.check_privilege_for_drop_alter(schema_name, &**table)?;
421
422 table.clone()
423 };
424
425 Ok(original_catalog)
426}
427
428#[cfg(test)]
429mod tests {
430 use std::collections::HashMap;
431
432 use risingwave_common::catalog::{
433 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
434 };
435 use risingwave_common::types::DataType;
436
437 use crate::catalog::root_catalog::SchemaPath;
438 use crate::test_utils::LocalFrontend;
439
440 #[tokio::test]
441 async fn test_add_column_handler() {
442 let frontend = LocalFrontend::new(Default::default()).await;
443 let session = frontend.session_ref();
444 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
445
446 let sql = "create table t (i int, r real);";
447 frontend.run_sql(sql).await.unwrap();
448
449 let get_table = || {
450 let catalog_reader = session.env().catalog_reader().read_guard();
451 catalog_reader
452 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
453 .unwrap()
454 .0
455 .clone()
456 };
457
458 let table = get_table();
459
460 let columns: HashMap<_, _> = table
461 .columns
462 .iter()
463 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
464 .collect();
465
466 let sql = "alter table t add column s text;";
468 frontend.run_sql(sql).await.unwrap();
469
470 let altered_table = get_table();
471
472 let altered_columns: HashMap<_, _> = altered_table
473 .columns
474 .iter()
475 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
476 .collect();
477
478 assert_eq!(columns.len() + 1, altered_columns.len());
480 assert_eq!(altered_columns["s"].0, DataType::Varchar);
481
482 assert_eq!(columns["i"], altered_columns["i"]);
484 assert_eq!(columns["r"], altered_columns["r"]);
485 assert_eq!(
486 columns[ROW_ID_COLUMN_NAME],
487 altered_columns[ROW_ID_COLUMN_NAME]
488 );
489
490 assert_eq!(
492 table.version.as_ref().unwrap().version_id + 1,
493 altered_table.version.as_ref().unwrap().version_id
494 );
495 assert_eq!(
496 table.version.as_ref().unwrap().next_column_id.next(),
497 altered_table.version.as_ref().unwrap().next_column_id
498 );
499 }
500}