risingwave_frontend/handler/
alter_table_column.rs1use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::{bail, bail_not_implemented};
22use risingwave_pb::ddl_service::TableJobType;
23use risingwave_pb::stream_plan::StreamFragmentGraph;
24use risingwave_sqlparser::ast::{
25 AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
26};
27
28use super::create_source::SqlColumnStrategy;
29use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
30use super::{HandlerArgs, RwPgResponse};
31use crate::catalog::purify::try_purify_table_source_create_sql_ast;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::table_catalog::TableType;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::expr::ExprImpl;
37use crate::session::SessionImpl;
38use crate::{Binder, TableCatalog};
39
40pub async fn get_new_table_definition_for_cdc_table(
42 original_catalog: Arc<TableCatalog>,
43 new_columns: &[ColumnCatalog],
44) -> Result<Statement> {
45 assert_eq!(
46 original_catalog.row_id_index, None,
47 "primary key of cdc table must be user defined"
48 );
49
50 let mut definition = original_catalog.create_sql_ast()?;
52
53 {
56 let Statement::CreateTable {
57 columns,
58 constraints,
59 ..
60 } = &mut definition
61 else {
62 panic!("unexpected statement: {:?}", definition);
63 };
64
65 columns.clear();
66 constraints.clear();
67 }
68
69 let new_definition = try_purify_table_source_create_sql_ast(
70 definition,
71 new_columns,
72 None,
73 &original_catalog.pk_column_names(),
76 )?;
77
78 Ok(new_definition)
79}
80
81pub async fn get_replace_table_plan(
82 session: &Arc<SessionImpl>,
83 table_name: ObjectName,
84 new_definition: Statement,
85 old_catalog: &Arc<TableCatalog>,
86 sql_column_strategy: SqlColumnStrategy,
87) -> Result<(
88 Option<SourceCatalog>,
89 TableCatalog,
90 StreamFragmentGraph,
91 TableJobType,
92)> {
93 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
95 let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
96
97 let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
98 session,
99 table_name,
100 old_catalog,
101 handler_args.clone(),
102 new_definition,
103 col_id_gen,
104 sql_column_strategy,
105 )
106 .await?;
107
108 let mut table = table;
110 table.vnode_count = VnodeCount::set(old_catalog.vnode_count());
111
112 Ok((source, table, graph, job_type))
113}
114
115pub async fn handle_alter_table_column(
118 handler_args: HandlerArgs,
119 table_name: ObjectName,
120 operation: AlterTableOperation,
121) -> Result<RwPgResponse> {
122 let session = handler_args.session;
123 let (original_catalog, has_incoming_sinks) =
124 fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
125
126 if original_catalog.webhook_info.is_some() {
127 return Err(RwError::from(ErrorCode::BindError(
128 "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
129 )));
130 }
131
132 let mut definition = original_catalog.create_sql_ast_purified()?;
134 let Statement::CreateTable { columns, .. } = &mut definition else {
135 panic!("unexpected statement: {:?}", definition);
136 };
137
138 if has_incoming_sinks && matches!(operation, AlterTableOperation::DropColumn { .. }) {
139 return Err(ErrorCode::InvalidInputSyntax(
140 "dropping columns in target table of sinks is not supported".to_owned(),
141 ))?;
142 }
143
144 let sql_column_strategy = match operation {
162 AlterTableOperation::AddColumn {
163 column_def: new_column,
164 } => {
165 let new_column_name = new_column.name.real_value();
168 if columns
169 .iter()
170 .any(|c| c.name.real_value() == new_column_name)
171 {
172 Err(ErrorCode::InvalidInputSyntax(format!(
173 "column \"{new_column_name}\" of table \"{table_name}\" already exists"
174 )))?
175 }
176
177 if new_column
178 .options
179 .iter()
180 .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
181 {
182 Err(ErrorCode::InvalidInputSyntax(
183 "alter table add generated columns is not supported".to_owned(),
184 ))?
185 }
186
187 if new_column
188 .options
189 .iter()
190 .any(|x| matches!(x.option, ColumnOption::NotNull))
191 && !new_column
192 .options
193 .iter()
194 .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
195 {
196 return Err(ErrorCode::InvalidInputSyntax(
197 "alter table add NOT NULL columns must have default value".to_owned(),
198 ))?;
199 }
200
201 columns.push(new_column);
203
204 SqlColumnStrategy::FollowChecked
205 }
206
207 AlterTableOperation::DropColumn {
208 column_name,
209 if_exists,
210 cascade,
211 } => {
212 if cascade {
213 bail_not_implemented!(issue = 6903, "drop column cascade");
214 }
215
216 for column in original_catalog.columns() {
218 if let Some(expr) = column.generated_expr() {
219 let expr = ExprImpl::from_expr_proto(expr)?;
220 let refs = expr.collect_input_refs(original_catalog.columns().len());
221 for idx in refs.ones() {
222 let refed_column = &original_catalog.columns()[idx];
223 if refed_column.name() == column_name.real_value() {
224 bail!(format!(
225 "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
226 column_name,
227 column.name()
228 ))
229 }
230 }
231 }
232 }
233
234 let column_name = column_name.real_value();
236 let removed_column = columns
237 .extract_if(.., |c| c.name.real_value() == column_name)
238 .at_most_one()
239 .ok()
240 .unwrap();
241
242 if removed_column.is_some() {
243 } else if if_exists {
245 return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
246 .notice(format!(
247 "column \"{}\" does not exist, skipping",
248 column_name
249 ))
250 .into());
251 } else {
252 Err(ErrorCode::InvalidInputSyntax(format!(
253 "column \"{}\" of table \"{}\" does not exist",
254 column_name, table_name
255 )))?
256 }
257
258 SqlColumnStrategy::FollowUnchecked
259 }
260
261 AlterTableOperation::AlterColumn { column_name, op } => {
262 let AlterColumnOperation::SetDataType {
263 data_type,
264 using: None,
265 } = op
266 else {
267 bail_not_implemented!(issue = 6903, "{op}");
268 };
269
270 let column_name = column_name.real_value();
272 let column = columns
273 .iter_mut()
274 .find(|c| c.name.real_value() == column_name)
275 .ok_or_else(|| {
276 ErrorCode::InvalidInputSyntax(format!(
277 "column \"{}\" of table \"{}\" does not exist",
278 column_name, table_name
279 ))
280 })?;
281
282 column.data_type = Some(data_type);
283
284 SqlColumnStrategy::FollowChecked
285 }
286
287 _ => unreachable!(),
288 };
289 let (source, table, graph, job_type) = get_replace_table_plan(
290 &session,
291 table_name,
292 definition,
293 &original_catalog,
294 sql_column_strategy,
295 )
296 .await?;
297
298 let catalog_writer = session.catalog_writer()?;
299
300 catalog_writer
301 .replace_table(
302 source.map(|x| x.to_prost()),
303 table.to_prost(),
304 graph,
305 job_type,
306 )
307 .await?;
308 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
309}
310
311pub fn fetch_table_catalog_for_alter(
312 session: &SessionImpl,
313 table_name: &ObjectName,
314) -> Result<(Arc<TableCatalog>, bool)> {
315 let db_name = &session.database();
316 let (schema_name, real_table_name) =
317 Binder::resolve_schema_qualified_name(db_name, table_name)?;
318 let search_path = session.config().search_path();
319 let user_name = &session.user_name();
320
321 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
322
323 {
324 let reader = session.env().catalog_reader().read_guard();
325 let (table, schema_name) =
326 reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
327
328 match table.table_type() {
329 TableType::Table => {}
330
331 _ => Err(ErrorCode::InvalidInputSyntax(format!(
332 "\"{table_name}\" is not a table or cannot be altered"
333 )))?,
334 }
335
336 session.check_privilege_for_drop_alter(schema_name, &**table)?;
337
338 let has_incoming_sinks = reader
339 .get_schema_by_id(&table.database_id, &table.schema_id)?
340 .table_incoming_sinks(table.id)
341 .map(|sinks| !sinks.is_empty())
342 .unwrap_or(false);
343
344 Ok((table.clone(), has_incoming_sinks))
345 }
346}
347
348#[cfg(test)]
349mod tests {
350 use std::collections::HashMap;
351
352 use risingwave_common::catalog::{
353 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
354 };
355 use risingwave_common::types::DataType;
356
357 use crate::catalog::root_catalog::SchemaPath;
358 use crate::test_utils::LocalFrontend;
359
360 #[tokio::test]
361 async fn test_add_column_handler() {
362 let frontend = LocalFrontend::new(Default::default()).await;
363 let session = frontend.session_ref();
364 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
365
366 let sql = "create table t (i int, r real);";
367 frontend.run_sql(sql).await.unwrap();
368
369 let get_table = || {
370 let catalog_reader = session.env().catalog_reader().read_guard();
371 catalog_reader
372 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
373 .unwrap()
374 .0
375 .clone()
376 };
377
378 let table = get_table();
379
380 let columns: HashMap<_, _> = table
381 .columns
382 .iter()
383 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
384 .collect();
385
386 let sql = "alter table t add column s text;";
388 frontend.run_sql(sql).await.unwrap();
389
390 let altered_table = get_table();
391
392 let altered_columns: HashMap<_, _> = altered_table
393 .columns
394 .iter()
395 .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
396 .collect();
397
398 assert_eq!(columns.len() + 1, altered_columns.len());
400 assert_eq!(altered_columns["s"].0, DataType::Varchar);
401
402 assert_eq!(columns["i"], altered_columns["i"]);
404 assert_eq!(columns["r"], altered_columns["r"]);
405 assert_eq!(
406 columns[ROW_ID_COLUMN_NAME],
407 altered_columns[ROW_ID_COLUMN_NAME]
408 );
409
410 assert_eq!(
412 table.version.as_ref().unwrap().version_id + 1,
413 altered_table.version.as_ref().unwrap().version_id
414 );
415 assert_eq!(
416 table.version.as_ref().unwrap().next_column_id.next(),
417 altered_table.version.as_ref().unwrap().next_column_id
418 );
419 }
420}