risingwave_frontend/handler/
alter_table_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::catalog::{Source, Table};
27use risingwave_pb::ddl_service::TableJobType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
30use risingwave_sqlparser::ast::{
31    AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
32};
33
34use super::create_source::SqlColumnStrategy;
35use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
36use super::{HandlerArgs, RwPgResponse};
37use crate::catalog::purify::try_purify_table_source_create_sql_ast;
38use crate::catalog::root_catalog::SchemaPath;
39use crate::catalog::table_catalog::TableType;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::expr::{Expr, ExprImpl, InputRef, Literal};
42use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
43use crate::session::SessionImpl;
44use crate::session::current::notice_to_user;
45use crate::{Binder, TableCatalog};
46
47/// Used in auto schema change process
48pub async fn get_new_table_definition_for_cdc_table(
49    session: &Arc<SessionImpl>,
50    table_name: ObjectName,
51    new_columns: &[ColumnCatalog],
52) -> Result<(Statement, Arc<TableCatalog>)> {
53    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
54
55    assert_eq!(
56        original_catalog.row_id_index, None,
57        "primary key of cdc table must be user defined"
58    );
59
60    // Retrieve the original table definition.
61    let mut definition = original_catalog.create_sql_ast()?;
62
63    // Clear the original columns field, so that we'll follow `new_columns` to generate a
64    // purified definition.
65    {
66        let Statement::CreateTable {
67            columns,
68            constraints,
69            ..
70        } = &mut definition
71        else {
72            panic!("unexpected statement: {:?}", definition);
73        };
74
75        columns.clear();
76        constraints.clear();
77    }
78
79    let new_definition = try_purify_table_source_create_sql_ast(
80        definition,
81        new_columns,
82        None,
83        // The IDs of `new_columns` may not be consistently maintained at this point.
84        // So we use the column names to identify the primary key columns.
85        &original_catalog.pk_column_names(),
86    )?;
87
88    Ok((new_definition, original_catalog))
89}
90
91pub async fn get_replace_table_plan(
92    session: &Arc<SessionImpl>,
93    table_name: ObjectName,
94    new_definition: Statement,
95    old_catalog: &Arc<TableCatalog>,
96    sql_column_strategy: SqlColumnStrategy,
97) -> Result<(
98    Option<Source>,
99    Table,
100    StreamFragmentGraph,
101    ColIndexMapping,
102    TableJobType,
103)> {
104    // Create handler args as if we're creating a new table with the altered definition.
105    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
106    let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
107
108    let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
109        session,
110        table_name,
111        old_catalog,
112        handler_args.clone(),
113        new_definition,
114        col_id_gen,
115        sql_column_strategy,
116    )
117    .await?;
118
119    // Calculate the mapping from the original columns to the new columns.
120    // This will be used to map the output of the table in the dispatcher to make
121    // existing downstream jobs work correctly.
122    let col_index_mapping = ColIndexMapping::new(
123        old_catalog
124            .columns()
125            .iter()
126            .map(|old_c| {
127                table.columns.iter().position(|new_c| {
128                    let new_c = new_c.get_column_desc().unwrap();
129
130                    // We consider both the column ID and the data type.
131                    // If either of them does not match, we will treat it as a new column.
132                    //
133                    // TODO: Since we've succeeded in assigning column IDs in the step above,
134                    //       the new data type is actually _compatible_ with the old one.
135                    //       Theoretically, it's also possible to do some sort of mapping for
136                    //       the downstream job to work correctly. However, the current impl
137                    //       only supports simple column projection, which we may improve in
138                    //       future works.
139                    //       However, by treating it as a new column, we can at least reject
140                    //       the case where the column with type change is referenced by any
141                    //       downstream jobs (because the original column is considered dropped).
142                    let id_matches = || new_c.column_id == old_c.column_id().get_id();
143                    let type_matches = || {
144                        let original_data_type = old_c.data_type();
145                        let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap());
146                        let matches = original_data_type == &new_data_type;
147                        if !matches {
148                            notice_to_user(format!("the data type of column \"{}\" has changed, treating as a new column", old_c.name()));
149                        }
150                        matches
151                    };
152
153                    id_matches() && type_matches()
154                })
155            })
156            .collect(),
157        table.columns.len(),
158    );
159
160    let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
161
162    let target_columns = table
163        .columns
164        .iter()
165        .map(|col| ColumnCatalog::from(col.clone()))
166        .filter(|col| !col.is_rw_timestamp_column())
167        .collect_vec();
168
169    for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
170        hijack_merger_for_target_table(
171            &mut graph,
172            &target_columns,
173            &sink,
174            Some(&sink.unique_identity()),
175        )?;
176    }
177
178    // Set some fields ourselves so that the meta service does not need to maintain them.
179    let mut table = table;
180    table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
181    table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
182
183    Ok((source, table, graph, col_index_mapping, job_type))
184}
185
186pub(crate) fn hijack_merger_for_target_table(
187    graph: &mut StreamFragmentGraph,
188    target_columns: &[ColumnCatalog],
189    sink: &SinkCatalog,
190    uniq_identify: Option<&str>,
191) -> Result<()> {
192    let mut sink_columns = sink.original_target_columns.clone();
193    if sink_columns.is_empty() {
194        // This is due to the fact that the value did not exist in earlier versions,
195        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
196        // Therefore the columns of the table at this point are `original_target_columns`.
197        // This value of sink will be filled on the meta.
198        sink_columns = target_columns.to_vec();
199    }
200
201    let mut i = 0;
202    let mut j = 0;
203    let mut exprs = Vec::new();
204
205    while j < target_columns.len() {
206        if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
207            exprs.push(ExprImpl::InputRef(Box::new(InputRef {
208                data_type: sink_columns[i].data_type().clone(),
209                index: i,
210            })));
211
212            i += 1;
213            j += 1;
214        } else {
215            exprs.push(ExprImpl::Literal(Box::new(Literal::new(
216                None,
217                target_columns[j].data_type().clone(),
218            ))));
219
220            j += 1;
221        }
222    }
223
224    let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
225        select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
226        ..Default::default()
227    }));
228
229    for fragment in graph.fragments.values_mut() {
230        if let Some(node) = &mut fragment.node {
231            insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
232        }
233    }
234
235    Ok(())
236}
237
238/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
239/// `DropColumn`.
240pub async fn handle_alter_table_column(
241    handler_args: HandlerArgs,
242    table_name: ObjectName,
243    operation: AlterTableOperation,
244) -> Result<RwPgResponse> {
245    let session = handler_args.session;
246    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
247
248    if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
249        return Err(RwError::from(ErrorCode::BindError(
250            "Alter a table with incoming sink and generated column has not been implemented."
251                .to_owned(),
252        )));
253    }
254
255    if original_catalog.webhook_info.is_some() {
256        return Err(RwError::from(ErrorCode::BindError(
257            "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
258        )));
259    }
260
261    // Retrieve the original table definition and parse it to AST.
262    let mut definition = original_catalog.create_sql_ast_purified()?;
263    let Statement::CreateTable { columns, .. } = &mut definition else {
264        panic!("unexpected statement: {:?}", definition);
265    };
266
267    if !original_catalog.incoming_sinks.is_empty()
268        && matches!(operation, AlterTableOperation::DropColumn { .. })
269    {
270        return Err(ErrorCode::InvalidInputSyntax(
271            "dropping columns in target table of sinks is not supported".to_owned(),
272        ))?;
273    }
274
275    // The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
276    // `FollowUnchecked` if the operation is `DropColumn`.
277    //
278    // Consider the following example:
279    // - There was a column `foo` and a generated column `gen` that references `foo`.
280    // - The external schema is updated to remove `foo`.
281    // - The user tries to drop `foo` from the table.
282    //
283    // Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
284    // first will also be rejected because `foo` does not exist any more. Also, executing
285    // `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
286    //
287    // `FollowUnchecked` workarounds this issue. There are also some alternatives:
288    // - Allow dropping multiple columns at once.
289    // - Check against the persisted schema, instead of resolving again.
290    //
291    // Applied only to tables with schema registry.
292    let sql_column_strategy = match operation {
293        AlterTableOperation::AddColumn {
294            column_def: new_column,
295        } => {
296            // Duplicated names can actually be checked by `StreamMaterialize`. We do here for
297            // better error reporting.
298            let new_column_name = new_column.name.real_value();
299            if columns
300                .iter()
301                .any(|c| c.name.real_value() == new_column_name)
302            {
303                Err(ErrorCode::InvalidInputSyntax(format!(
304                    "column \"{new_column_name}\" of table \"{table_name}\" already exists"
305                )))?
306            }
307
308            if new_column
309                .options
310                .iter()
311                .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
312            {
313                Err(ErrorCode::InvalidInputSyntax(
314                    "alter table add generated columns is not supported".to_owned(),
315                ))?
316            }
317
318            if new_column
319                .options
320                .iter()
321                .any(|x| matches!(x.option, ColumnOption::NotNull))
322                && !new_column
323                    .options
324                    .iter()
325                    .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
326            {
327                return Err(ErrorCode::InvalidInputSyntax(
328                    "alter table add NOT NULL columns must have default value".to_owned(),
329                ))?;
330            }
331
332            // Add the new column to the table definition if it is not created by `create table (*)` syntax.
333            columns.push(new_column);
334
335            SqlColumnStrategy::FollowChecked
336        }
337
338        AlterTableOperation::DropColumn {
339            column_name,
340            if_exists,
341            cascade,
342        } => {
343            if cascade {
344                bail_not_implemented!(issue = 6903, "drop column cascade");
345            }
346
347            // Check if the column to drop is referenced by any generated columns.
348            for column in original_catalog.columns() {
349                if let Some(expr) = column.generated_expr() {
350                    let expr = ExprImpl::from_expr_proto(expr)?;
351                    let refs = expr.collect_input_refs(original_catalog.columns().len());
352                    for idx in refs.ones() {
353                        let refed_column = &original_catalog.columns()[idx];
354                        if refed_column.name() == column_name.real_value() {
355                            bail!(format!(
356                                "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
357                                column_name,
358                                column.name()
359                            ))
360                        }
361                    }
362                }
363            }
364
365            // Locate the column by name and remove it.
366            let column_name = column_name.real_value();
367            let removed_column = columns
368                .extract_if(.., |c| c.name.real_value() == column_name)
369                .at_most_one()
370                .ok()
371                .unwrap();
372
373            if removed_column.is_some() {
374                // PASS
375            } else if if_exists {
376                return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
377                    .notice(format!(
378                        "column \"{}\" does not exist, skipping",
379                        column_name
380                    ))
381                    .into());
382            } else {
383                Err(ErrorCode::InvalidInputSyntax(format!(
384                    "column \"{}\" of table \"{}\" does not exist",
385                    column_name, table_name
386                )))?
387            }
388
389            SqlColumnStrategy::FollowUnchecked
390        }
391
392        AlterTableOperation::AlterColumn { column_name, op } => {
393            let AlterColumnOperation::SetDataType {
394                data_type,
395                using: None,
396            } = op
397            else {
398                bail_not_implemented!(issue = 6903, "{op}");
399            };
400
401            // Locate the column by name and update its data type.
402            let column_name = column_name.real_value();
403            let column = columns
404                .iter_mut()
405                .find(|c| c.name.real_value() == column_name)
406                .ok_or_else(|| {
407                    ErrorCode::InvalidInputSyntax(format!(
408                        "column \"{}\" of table \"{}\" does not exist",
409                        column_name, table_name
410                    ))
411                })?;
412
413            column.data_type = Some(data_type);
414
415            SqlColumnStrategy::FollowChecked
416        }
417
418        _ => unreachable!(),
419    };
420    let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
421        &session,
422        table_name,
423        definition,
424        &original_catalog,
425        sql_column_strategy,
426    )
427    .await?;
428
429    let catalog_writer = session.catalog_writer()?;
430
431    catalog_writer
432        .replace_table(source, table, graph, col_index_mapping, job_type)
433        .await?;
434    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
435}
436
437pub fn fetch_table_catalog_for_alter(
438    session: &SessionImpl,
439    table_name: &ObjectName,
440) -> Result<Arc<TableCatalog>> {
441    let db_name = &session.database();
442    let (schema_name, real_table_name) =
443        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
444    let search_path = session.config().search_path();
445    let user_name = &session.user_name();
446
447    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
448
449    let original_catalog = {
450        let reader = session.env().catalog_reader().read_guard();
451        let (table, schema_name) =
452            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
453
454        match table.table_type() {
455            TableType::Table => {}
456
457            _ => Err(ErrorCode::InvalidInputSyntax(format!(
458                "\"{table_name}\" is not a table or cannot be altered"
459            )))?,
460        }
461
462        session.check_privilege_for_drop_alter(schema_name, &**table)?;
463
464        table.clone()
465    };
466
467    Ok(original_catalog)
468}
469
470#[cfg(test)]
471mod tests {
472    use std::collections::HashMap;
473
474    use risingwave_common::catalog::{
475        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
476    };
477    use risingwave_common::types::DataType;
478
479    use crate::catalog::root_catalog::SchemaPath;
480    use crate::test_utils::LocalFrontend;
481
482    #[tokio::test]
483    async fn test_add_column_handler() {
484        let frontend = LocalFrontend::new(Default::default()).await;
485        let session = frontend.session_ref();
486        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
487
488        let sql = "create table t (i int, r real);";
489        frontend.run_sql(sql).await.unwrap();
490
491        let get_table = || {
492            let catalog_reader = session.env().catalog_reader().read_guard();
493            catalog_reader
494                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
495                .unwrap()
496                .0
497                .clone()
498        };
499
500        let table = get_table();
501
502        let columns: HashMap<_, _> = table
503            .columns
504            .iter()
505            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
506            .collect();
507
508        // Alter the table.
509        let sql = "alter table t add column s text;";
510        frontend.run_sql(sql).await.unwrap();
511
512        let altered_table = get_table();
513
514        let altered_columns: HashMap<_, _> = altered_table
515            .columns
516            .iter()
517            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
518            .collect();
519
520        // Check the new column.
521        assert_eq!(columns.len() + 1, altered_columns.len());
522        assert_eq!(altered_columns["s"].0, DataType::Varchar);
523
524        // Check the old columns and IDs are not changed.
525        assert_eq!(columns["i"], altered_columns["i"]);
526        assert_eq!(columns["r"], altered_columns["r"]);
527        assert_eq!(
528            columns[ROW_ID_COLUMN_NAME],
529            altered_columns[ROW_ID_COLUMN_NAME]
530        );
531
532        // Check the version is updated.
533        assert_eq!(
534            table.version.as_ref().unwrap().version_id + 1,
535            altered_table.version.as_ref().unwrap().version_id
536        );
537        assert_eq!(
538            table.version.as_ref().unwrap().next_column_id.next(),
539            altered_table.version.as_ref().unwrap().next_column_id
540        );
541    }
542}