risingwave_frontend/handler/
alter_table_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::types::DataType;
23use risingwave_common::util::column_index_mapping::ColIndexMapping;
24use risingwave_common::{bail, bail_not_implemented};
25use risingwave_connector::sink::catalog::SinkCatalog;
26use risingwave_pb::catalog::{Source, Table};
27use risingwave_pb::ddl_service::TableJobType;
28use risingwave_pb::stream_plan::stream_node::PbNodeBody;
29use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
30use risingwave_sqlparser::ast::{
31    AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
32};
33
34use super::create_source::SqlColumnStrategy;
35use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
36use super::{HandlerArgs, RwPgResponse};
37use crate::catalog::purify::try_purify_table_source_create_sql_ast;
38use crate::catalog::root_catalog::SchemaPath;
39use crate::catalog::table_catalog::TableType;
40use crate::error::{ErrorCode, Result, RwError};
41use crate::expr::{Expr, ExprImpl, InputRef, Literal};
42use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
43use crate::session::SessionImpl;
44use crate::{Binder, TableCatalog};
45
46/// Used in auto schema change process
47pub async fn get_new_table_definition_for_cdc_table(
48    session: &Arc<SessionImpl>,
49    table_name: ObjectName,
50    new_columns: &[ColumnCatalog],
51) -> Result<(Statement, Arc<TableCatalog>)> {
52    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
53
54    assert_eq!(
55        original_catalog.row_id_index, None,
56        "primary key of cdc table must be user defined"
57    );
58
59    // Retrieve the original table definition.
60    let mut definition = original_catalog.create_sql_ast()?;
61
62    // Clear the original columns field, so that we'll follow `new_columns` to generate a
63    // purified definition.
64    {
65        let Statement::CreateTable {
66            columns,
67            constraints,
68            ..
69        } = &mut definition
70        else {
71            panic!("unexpected statement: {:?}", definition);
72        };
73
74        columns.clear();
75        constraints.clear();
76    }
77
78    let new_definition = try_purify_table_source_create_sql_ast(
79        definition,
80        new_columns,
81        None,
82        // The IDs of `new_columns` may not be consistently maintained at this point.
83        // So we use the column names to identify the primary key columns.
84        &original_catalog.pk_column_names(),
85    )?;
86
87    Ok((new_definition, original_catalog))
88}
89
90pub async fn get_replace_table_plan(
91    session: &Arc<SessionImpl>,
92    table_name: ObjectName,
93    new_definition: Statement,
94    old_catalog: &Arc<TableCatalog>,
95    sql_column_strategy: SqlColumnStrategy,
96) -> Result<(
97    Option<Source>,
98    Table,
99    StreamFragmentGraph,
100    ColIndexMapping,
101    TableJobType,
102)> {
103    // Create handler args as if we're creating a new table with the altered definition.
104    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
105    let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
106
107    let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
108        session,
109        table_name,
110        old_catalog,
111        handler_args.clone(),
112        new_definition,
113        col_id_gen,
114        sql_column_strategy,
115    )
116    .await?;
117
118    // Calculate the mapping from the original columns to the new columns.
119    //
120    // Note: Previously, this will be used to map the output of the table in the dispatcher to make
121    // existing downstream jobs work correctly. This is no longer the case. We will generate mapping
122    // directly in the meta service by checking the new schema of this table and all downstream jobs,
123    // which simplifies handling `ALTER TABLE ALTER COLUMN TYPE`.
124    //
125    // TODO: However, we still generate this mapping and use it for rewriting downstream indexes'
126    // `index_item`. We should consider completely removing this in future works.
127    let col_index_mapping = ColIndexMapping::new(
128        old_catalog
129            .columns()
130            .iter()
131            .map(|old_c| {
132                table.columns.iter().position(|new_c| {
133                    let new_c = new_c.get_column_desc().unwrap();
134
135                    // We consider both the column ID and the data type.
136                    // If either of them does not match, we will treat it as a different column.
137                    //
138                    // Note that this does not hurt the ability for `ALTER TABLE ALTER COLUMN TYPE`,
139                    // because we don't rely on this mapping in dispatcher. However, if there's an
140                    // index on the column, currently it will fail to rewrite as the column is
141                    // considered as if it's dropped.
142                    let id_matches = || new_c.column_id == old_c.column_id().get_id();
143                    let type_matches = || {
144                        let original_data_type = old_c.data_type();
145                        let new_data_type = DataType::from(new_c.column_type.as_ref().unwrap());
146                        original_data_type == &new_data_type
147                    };
148
149                    id_matches() && type_matches()
150                })
151            })
152            .collect(),
153        table.columns.len(),
154    );
155
156    let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
157
158    let target_columns = table
159        .columns
160        .iter()
161        .map(|col| ColumnCatalog::from(col.clone()))
162        .filter(|col| !col.is_rw_timestamp_column())
163        .collect_vec();
164
165    for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
166        hijack_merger_for_target_table(
167            &mut graph,
168            &target_columns,
169            &sink,
170            Some(&sink.unique_identity()),
171        )?;
172    }
173
174    // Set some fields ourselves so that the meta service does not need to maintain them.
175    let mut table = table;
176    table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
177    table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
178
179    Ok((source, table, graph, col_index_mapping, job_type))
180}
181
182pub(crate) fn hijack_merger_for_target_table(
183    graph: &mut StreamFragmentGraph,
184    target_columns: &[ColumnCatalog],
185    sink: &SinkCatalog,
186    uniq_identify: Option<&str>,
187) -> Result<()> {
188    let mut sink_columns = sink.original_target_columns.clone();
189    if sink_columns.is_empty() {
190        // This is due to the fact that the value did not exist in earlier versions,
191        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
192        // Therefore the columns of the table at this point are `original_target_columns`.
193        // This value of sink will be filled on the meta.
194        sink_columns = target_columns.to_vec();
195    }
196
197    let mut i = 0;
198    let mut j = 0;
199    let mut exprs = Vec::new();
200
201    while j < target_columns.len() {
202        if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
203            exprs.push(ExprImpl::InputRef(Box::new(InputRef {
204                data_type: sink_columns[i].data_type().clone(),
205                index: i,
206            })));
207
208            i += 1;
209            j += 1;
210        } else {
211            exprs.push(ExprImpl::Literal(Box::new(Literal::new(
212                None,
213                target_columns[j].data_type().clone(),
214            ))));
215
216            j += 1;
217        }
218    }
219
220    let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
221        select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
222        ..Default::default()
223    }));
224
225    for fragment in graph.fragments.values_mut() {
226        if let Some(node) = &mut fragment.node {
227            insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
228        }
229    }
230
231    Ok(())
232}
233
234/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
235/// `DropColumn`.
236pub async fn handle_alter_table_column(
237    handler_args: HandlerArgs,
238    table_name: ObjectName,
239    operation: AlterTableOperation,
240) -> Result<RwPgResponse> {
241    let session = handler_args.session;
242    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
243
244    if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
245        return Err(RwError::from(ErrorCode::BindError(
246            "Alter a table with incoming sink and generated column has not been implemented."
247                .to_owned(),
248        )));
249    }
250
251    if original_catalog.webhook_info.is_some() {
252        return Err(RwError::from(ErrorCode::BindError(
253            "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
254        )));
255    }
256
257    // Retrieve the original table definition and parse it to AST.
258    let mut definition = original_catalog.create_sql_ast_purified()?;
259    let Statement::CreateTable { columns, .. } = &mut definition else {
260        panic!("unexpected statement: {:?}", definition);
261    };
262
263    if !original_catalog.incoming_sinks.is_empty()
264        && matches!(operation, AlterTableOperation::DropColumn { .. })
265    {
266        return Err(ErrorCode::InvalidInputSyntax(
267            "dropping columns in target table of sinks is not supported".to_owned(),
268        ))?;
269    }
270
271    // The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
272    // `FollowUnchecked` if the operation is `DropColumn`.
273    //
274    // Consider the following example:
275    // - There was a column `foo` and a generated column `gen` that references `foo`.
276    // - The external schema is updated to remove `foo`.
277    // - The user tries to drop `foo` from the table.
278    //
279    // Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
280    // first will also be rejected because `foo` does not exist any more. Also, executing
281    // `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
282    //
283    // `FollowUnchecked` workarounds this issue. There are also some alternatives:
284    // - Allow dropping multiple columns at once.
285    // - Check against the persisted schema, instead of resolving again.
286    //
287    // Applied only to tables with schema registry.
288    let sql_column_strategy = match operation {
289        AlterTableOperation::AddColumn {
290            column_def: new_column,
291        } => {
292            // Duplicated names can actually be checked by `StreamMaterialize`. We do here for
293            // better error reporting.
294            let new_column_name = new_column.name.real_value();
295            if columns
296                .iter()
297                .any(|c| c.name.real_value() == new_column_name)
298            {
299                Err(ErrorCode::InvalidInputSyntax(format!(
300                    "column \"{new_column_name}\" of table \"{table_name}\" already exists"
301                )))?
302            }
303
304            if new_column
305                .options
306                .iter()
307                .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
308            {
309                Err(ErrorCode::InvalidInputSyntax(
310                    "alter table add generated columns is not supported".to_owned(),
311                ))?
312            }
313
314            if new_column
315                .options
316                .iter()
317                .any(|x| matches!(x.option, ColumnOption::NotNull))
318                && !new_column
319                    .options
320                    .iter()
321                    .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
322            {
323                return Err(ErrorCode::InvalidInputSyntax(
324                    "alter table add NOT NULL columns must have default value".to_owned(),
325                ))?;
326            }
327
328            // Add the new column to the table definition if it is not created by `create table (*)` syntax.
329            columns.push(new_column);
330
331            SqlColumnStrategy::FollowChecked
332        }
333
334        AlterTableOperation::DropColumn {
335            column_name,
336            if_exists,
337            cascade,
338        } => {
339            if cascade {
340                bail_not_implemented!(issue = 6903, "drop column cascade");
341            }
342
343            // Check if the column to drop is referenced by any generated columns.
344            for column in original_catalog.columns() {
345                if let Some(expr) = column.generated_expr() {
346                    let expr = ExprImpl::from_expr_proto(expr)?;
347                    let refs = expr.collect_input_refs(original_catalog.columns().len());
348                    for idx in refs.ones() {
349                        let refed_column = &original_catalog.columns()[idx];
350                        if refed_column.name() == column_name.real_value() {
351                            bail!(format!(
352                                "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
353                                column_name,
354                                column.name()
355                            ))
356                        }
357                    }
358                }
359            }
360
361            // Locate the column by name and remove it.
362            let column_name = column_name.real_value();
363            let removed_column = columns
364                .extract_if(.., |c| c.name.real_value() == column_name)
365                .at_most_one()
366                .ok()
367                .unwrap();
368
369            if removed_column.is_some() {
370                // PASS
371            } else if if_exists {
372                return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
373                    .notice(format!(
374                        "column \"{}\" does not exist, skipping",
375                        column_name
376                    ))
377                    .into());
378            } else {
379                Err(ErrorCode::InvalidInputSyntax(format!(
380                    "column \"{}\" of table \"{}\" does not exist",
381                    column_name, table_name
382                )))?
383            }
384
385            SqlColumnStrategy::FollowUnchecked
386        }
387
388        AlterTableOperation::AlterColumn { column_name, op } => {
389            let AlterColumnOperation::SetDataType {
390                data_type,
391                using: None,
392            } = op
393            else {
394                bail_not_implemented!(issue = 6903, "{op}");
395            };
396
397            // Locate the column by name and update its data type.
398            let column_name = column_name.real_value();
399            let column = columns
400                .iter_mut()
401                .find(|c| c.name.real_value() == column_name)
402                .ok_or_else(|| {
403                    ErrorCode::InvalidInputSyntax(format!(
404                        "column \"{}\" of table \"{}\" does not exist",
405                        column_name, table_name
406                    ))
407                })?;
408
409            column.data_type = Some(data_type);
410
411            SqlColumnStrategy::FollowChecked
412        }
413
414        _ => unreachable!(),
415    };
416    let (source, table, graph, col_index_mapping, job_type) = get_replace_table_plan(
417        &session,
418        table_name,
419        definition,
420        &original_catalog,
421        sql_column_strategy,
422    )
423    .await?;
424
425    let catalog_writer = session.catalog_writer()?;
426
427    catalog_writer
428        .replace_table(source, table, graph, col_index_mapping, job_type)
429        .await?;
430    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
431}
432
433pub fn fetch_table_catalog_for_alter(
434    session: &SessionImpl,
435    table_name: &ObjectName,
436) -> Result<Arc<TableCatalog>> {
437    let db_name = &session.database();
438    let (schema_name, real_table_name) =
439        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
440    let search_path = session.config().search_path();
441    let user_name = &session.user_name();
442
443    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
444
445    let original_catalog = {
446        let reader = session.env().catalog_reader().read_guard();
447        let (table, schema_name) =
448            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
449
450        match table.table_type() {
451            TableType::Table => {}
452
453            _ => Err(ErrorCode::InvalidInputSyntax(format!(
454                "\"{table_name}\" is not a table or cannot be altered"
455            )))?,
456        }
457
458        session.check_privilege_for_drop_alter(schema_name, &**table)?;
459
460        table.clone()
461    };
462
463    Ok(original_catalog)
464}
465
466#[cfg(test)]
467mod tests {
468    use std::collections::HashMap;
469
470    use risingwave_common::catalog::{
471        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
472    };
473    use risingwave_common::types::DataType;
474
475    use crate::catalog::root_catalog::SchemaPath;
476    use crate::test_utils::LocalFrontend;
477
478    #[tokio::test]
479    async fn test_add_column_handler() {
480        let frontend = LocalFrontend::new(Default::default()).await;
481        let session = frontend.session_ref();
482        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
483
484        let sql = "create table t (i int, r real);";
485        frontend.run_sql(sql).await.unwrap();
486
487        let get_table = || {
488            let catalog_reader = session.env().catalog_reader().read_guard();
489            catalog_reader
490                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
491                .unwrap()
492                .0
493                .clone()
494        };
495
496        let table = get_table();
497
498        let columns: HashMap<_, _> = table
499            .columns
500            .iter()
501            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
502            .collect();
503
504        // Alter the table.
505        let sql = "alter table t add column s text;";
506        frontend.run_sql(sql).await.unwrap();
507
508        let altered_table = get_table();
509
510        let altered_columns: HashMap<_, _> = altered_table
511            .columns
512            .iter()
513            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
514            .collect();
515
516        // Check the new column.
517        assert_eq!(columns.len() + 1, altered_columns.len());
518        assert_eq!(altered_columns["s"].0, DataType::Varchar);
519
520        // Check the old columns and IDs are not changed.
521        assert_eq!(columns["i"], altered_columns["i"]);
522        assert_eq!(columns["r"], altered_columns["r"]);
523        assert_eq!(
524            columns[ROW_ID_COLUMN_NAME],
525            altered_columns[ROW_ID_COLUMN_NAME]
526        );
527
528        // Check the version is updated.
529        assert_eq!(
530            table.version.as_ref().unwrap().version_id + 1,
531            altered_table.version.as_ref().unwrap().version_id
532        );
533        assert_eq!(
534            table.version.as_ref().unwrap().next_column_id.next(),
535            altered_table.version.as_ref().unwrap().next_column_id
536        );
537    }
538}