risingwave_frontend/handler/
alter_table_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use itertools::Itertools;
19use pgwire::pg_response::{PgResponse, StatementType};
20use risingwave_common::catalog::ColumnCatalog;
21use risingwave_common::hash::VnodeCount;
22use risingwave_common::{bail, bail_not_implemented};
23use risingwave_connector::sink::catalog::SinkCatalog;
24use risingwave_pb::catalog::{Source, Table};
25use risingwave_pb::ddl_service::TableJobType;
26use risingwave_pb::stream_plan::stream_node::PbNodeBody;
27use risingwave_pb::stream_plan::{ProjectNode, StreamFragmentGraph};
28use risingwave_sqlparser::ast::{
29    AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
30};
31
32use super::create_source::SqlColumnStrategy;
33use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
34use super::{HandlerArgs, RwPgResponse};
35use crate::catalog::purify::try_purify_table_source_create_sql_ast;
36use crate::catalog::root_catalog::SchemaPath;
37use crate::catalog::table_catalog::TableType;
38use crate::error::{ErrorCode, Result, RwError};
39use crate::expr::{Expr, ExprImpl, InputRef, Literal};
40use crate::handler::create_sink::{fetch_incoming_sinks, insert_merger_to_union_with_project};
41use crate::session::SessionImpl;
42use crate::{Binder, TableCatalog};
43
44/// Used in auto schema change process
45pub async fn get_new_table_definition_for_cdc_table(
46    session: &Arc<SessionImpl>,
47    table_name: ObjectName,
48    new_columns: &[ColumnCatalog],
49) -> Result<(Statement, Arc<TableCatalog>)> {
50    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
51
52    assert_eq!(
53        original_catalog.row_id_index, None,
54        "primary key of cdc table must be user defined"
55    );
56
57    // Retrieve the original table definition.
58    let mut definition = original_catalog.create_sql_ast()?;
59
60    // Clear the original columns field, so that we'll follow `new_columns` to generate a
61    // purified definition.
62    {
63        let Statement::CreateTable {
64            columns,
65            constraints,
66            ..
67        } = &mut definition
68        else {
69            panic!("unexpected statement: {:?}", definition);
70        };
71
72        columns.clear();
73        constraints.clear();
74    }
75
76    let new_definition = try_purify_table_source_create_sql_ast(
77        definition,
78        new_columns,
79        None,
80        // The IDs of `new_columns` may not be consistently maintained at this point.
81        // So we use the column names to identify the primary key columns.
82        &original_catalog.pk_column_names(),
83    )?;
84
85    Ok((new_definition, original_catalog))
86}
87
88pub async fn get_replace_table_plan(
89    session: &Arc<SessionImpl>,
90    table_name: ObjectName,
91    new_definition: Statement,
92    old_catalog: &Arc<TableCatalog>,
93    sql_column_strategy: SqlColumnStrategy,
94) -> Result<(Option<Source>, Table, StreamFragmentGraph, TableJobType)> {
95    // Create handler args as if we're creating a new table with the altered definition.
96    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
97    let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
98
99    let (mut graph, table, source, job_type) = generate_stream_graph_for_replace_table(
100        session,
101        table_name,
102        old_catalog,
103        handler_args.clone(),
104        new_definition,
105        col_id_gen,
106        sql_column_strategy,
107    )
108    .await?;
109
110    let incoming_sink_ids: HashSet<_> = old_catalog.incoming_sinks.iter().copied().collect();
111
112    let target_columns = table
113        .columns
114        .iter()
115        .map(|col| ColumnCatalog::from(col.clone()))
116        .filter(|col| !col.is_rw_timestamp_column())
117        .collect_vec();
118
119    for sink in fetch_incoming_sinks(session, &incoming_sink_ids)? {
120        hijack_merger_for_target_table(
121            &mut graph,
122            &target_columns,
123            &sink,
124            Some(&sink.unique_identity()),
125        )?;
126    }
127
128    // Set some fields ourselves so that the meta service does not need to maintain them.
129    let mut table = table;
130    table.incoming_sinks = incoming_sink_ids.iter().copied().collect();
131    table.maybe_vnode_count = VnodeCount::set(old_catalog.vnode_count()).to_protobuf();
132
133    Ok((source, table, graph, job_type))
134}
135
136pub(crate) fn hijack_merger_for_target_table(
137    graph: &mut StreamFragmentGraph,
138    target_columns: &[ColumnCatalog],
139    sink: &SinkCatalog,
140    uniq_identify: Option<&str>,
141) -> Result<()> {
142    let mut sink_columns = sink.original_target_columns.clone();
143    if sink_columns.is_empty() {
144        // This is due to the fact that the value did not exist in earlier versions,
145        // which means no schema changes such as `ADD/DROP COLUMN` have been made to the table.
146        // Therefore the columns of the table at this point are `original_target_columns`.
147        // This value of sink will be filled on the meta.
148        sink_columns = target_columns.to_vec();
149    }
150
151    let mut i = 0;
152    let mut j = 0;
153    let mut exprs = Vec::new();
154
155    while j < target_columns.len() {
156        if i < sink_columns.len() && sink_columns[i].data_type() == target_columns[j].data_type() {
157            exprs.push(ExprImpl::InputRef(Box::new(InputRef {
158                data_type: sink_columns[i].data_type().clone(),
159                index: i,
160            })));
161
162            i += 1;
163            j += 1;
164        } else {
165            exprs.push(ExprImpl::Literal(Box::new(Literal::new(
166                None,
167                target_columns[j].data_type().clone(),
168            ))));
169
170            j += 1;
171        }
172    }
173
174    let pb_project = PbNodeBody::Project(Box::new(ProjectNode {
175        select_list: exprs.iter().map(|expr| expr.to_expr_proto()).collect(),
176        ..Default::default()
177    }));
178
179    for fragment in graph.fragments.values_mut() {
180        if let Some(node) = &mut fragment.node {
181            insert_merger_to_union_with_project(node, &pb_project, uniq_identify);
182        }
183    }
184
185    Ok(())
186}
187
188/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
189/// `DropColumn`.
190pub async fn handle_alter_table_column(
191    handler_args: HandlerArgs,
192    table_name: ObjectName,
193    operation: AlterTableOperation,
194) -> Result<RwPgResponse> {
195    let session = handler_args.session;
196    let original_catalog = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
197
198    if !original_catalog.incoming_sinks.is_empty() && original_catalog.has_generated_column() {
199        return Err(RwError::from(ErrorCode::BindError(
200            "Alter a table with incoming sink and generated column has not been implemented."
201                .to_owned(),
202        )));
203    }
204
205    if original_catalog.webhook_info.is_some() {
206        return Err(RwError::from(ErrorCode::BindError(
207            "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
208        )));
209    }
210
211    // Retrieve the original table definition and parse it to AST.
212    let mut definition = original_catalog.create_sql_ast_purified()?;
213    let Statement::CreateTable { columns, .. } = &mut definition else {
214        panic!("unexpected statement: {:?}", definition);
215    };
216
217    if !original_catalog.incoming_sinks.is_empty()
218        && matches!(operation, AlterTableOperation::DropColumn { .. })
219    {
220        return Err(ErrorCode::InvalidInputSyntax(
221            "dropping columns in target table of sinks is not supported".to_owned(),
222        ))?;
223    }
224
225    // The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
226    // `FollowUnchecked` if the operation is `DropColumn`.
227    //
228    // Consider the following example:
229    // - There was a column `foo` and a generated column `gen` that references `foo`.
230    // - The external schema is updated to remove `foo`.
231    // - The user tries to drop `foo` from the table.
232    //
233    // Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
234    // first will also be rejected because `foo` does not exist any more. Also, executing
235    // `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
236    //
237    // `FollowUnchecked` workarounds this issue. There are also some alternatives:
238    // - Allow dropping multiple columns at once.
239    // - Check against the persisted schema, instead of resolving again.
240    //
241    // Applied only to tables with schema registry.
242    let sql_column_strategy = match operation {
243        AlterTableOperation::AddColumn {
244            column_def: new_column,
245        } => {
246            // Duplicated names can actually be checked by `StreamMaterialize`. We do here for
247            // better error reporting.
248            let new_column_name = new_column.name.real_value();
249            if columns
250                .iter()
251                .any(|c| c.name.real_value() == new_column_name)
252            {
253                Err(ErrorCode::InvalidInputSyntax(format!(
254                    "column \"{new_column_name}\" of table \"{table_name}\" already exists"
255                )))?
256            }
257
258            if new_column
259                .options
260                .iter()
261                .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
262            {
263                Err(ErrorCode::InvalidInputSyntax(
264                    "alter table add generated columns is not supported".to_owned(),
265                ))?
266            }
267
268            if new_column
269                .options
270                .iter()
271                .any(|x| matches!(x.option, ColumnOption::NotNull))
272                && !new_column
273                    .options
274                    .iter()
275                    .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
276            {
277                return Err(ErrorCode::InvalidInputSyntax(
278                    "alter table add NOT NULL columns must have default value".to_owned(),
279                ))?;
280            }
281
282            // Add the new column to the table definition if it is not created by `create table (*)` syntax.
283            columns.push(new_column);
284
285            SqlColumnStrategy::FollowChecked
286        }
287
288        AlterTableOperation::DropColumn {
289            column_name,
290            if_exists,
291            cascade,
292        } => {
293            if cascade {
294                bail_not_implemented!(issue = 6903, "drop column cascade");
295            }
296
297            // Check if the column to drop is referenced by any generated columns.
298            for column in original_catalog.columns() {
299                if let Some(expr) = column.generated_expr() {
300                    let expr = ExprImpl::from_expr_proto(expr)?;
301                    let refs = expr.collect_input_refs(original_catalog.columns().len());
302                    for idx in refs.ones() {
303                        let refed_column = &original_catalog.columns()[idx];
304                        if refed_column.name() == column_name.real_value() {
305                            bail!(format!(
306                                "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
307                                column_name,
308                                column.name()
309                            ))
310                        }
311                    }
312                }
313            }
314
315            // Locate the column by name and remove it.
316            let column_name = column_name.real_value();
317            let removed_column = columns
318                .extract_if(.., |c| c.name.real_value() == column_name)
319                .at_most_one()
320                .ok()
321                .unwrap();
322
323            if removed_column.is_some() {
324                // PASS
325            } else if if_exists {
326                return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
327                    .notice(format!(
328                        "column \"{}\" does not exist, skipping",
329                        column_name
330                    ))
331                    .into());
332            } else {
333                Err(ErrorCode::InvalidInputSyntax(format!(
334                    "column \"{}\" of table \"{}\" does not exist",
335                    column_name, table_name
336                )))?
337            }
338
339            SqlColumnStrategy::FollowUnchecked
340        }
341
342        AlterTableOperation::AlterColumn { column_name, op } => {
343            let AlterColumnOperation::SetDataType {
344                data_type,
345                using: None,
346            } = op
347            else {
348                bail_not_implemented!(issue = 6903, "{op}");
349            };
350
351            // Locate the column by name and update its data type.
352            let column_name = column_name.real_value();
353            let column = columns
354                .iter_mut()
355                .find(|c| c.name.real_value() == column_name)
356                .ok_or_else(|| {
357                    ErrorCode::InvalidInputSyntax(format!(
358                        "column \"{}\" of table \"{}\" does not exist",
359                        column_name, table_name
360                    ))
361                })?;
362
363            column.data_type = Some(data_type);
364
365            SqlColumnStrategy::FollowChecked
366        }
367
368        _ => unreachable!(),
369    };
370    let (source, table, graph, job_type) = get_replace_table_plan(
371        &session,
372        table_name,
373        definition,
374        &original_catalog,
375        sql_column_strategy,
376    )
377    .await?;
378
379    let catalog_writer = session.catalog_writer()?;
380
381    catalog_writer
382        .replace_table(source, table, graph, job_type)
383        .await?;
384    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
385}
386
387pub fn fetch_table_catalog_for_alter(
388    session: &SessionImpl,
389    table_name: &ObjectName,
390) -> Result<Arc<TableCatalog>> {
391    let db_name = &session.database();
392    let (schema_name, real_table_name) =
393        Binder::resolve_schema_qualified_name(db_name, table_name.clone())?;
394    let search_path = session.config().search_path();
395    let user_name = &session.user_name();
396
397    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
398
399    let original_catalog = {
400        let reader = session.env().catalog_reader().read_guard();
401        let (table, schema_name) =
402            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
403
404        match table.table_type() {
405            TableType::Table => {}
406
407            _ => Err(ErrorCode::InvalidInputSyntax(format!(
408                "\"{table_name}\" is not a table or cannot be altered"
409            )))?,
410        }
411
412        session.check_privilege_for_drop_alter(schema_name, &**table)?;
413
414        table.clone()
415    };
416
417    Ok(original_catalog)
418}
419
420#[cfg(test)]
421mod tests {
422    use std::collections::HashMap;
423
424    use risingwave_common::catalog::{
425        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
426    };
427    use risingwave_common::types::DataType;
428
429    use crate::catalog::root_catalog::SchemaPath;
430    use crate::test_utils::LocalFrontend;
431
432    #[tokio::test]
433    async fn test_add_column_handler() {
434        let frontend = LocalFrontend::new(Default::default()).await;
435        let session = frontend.session_ref();
436        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
437
438        let sql = "create table t (i int, r real);";
439        frontend.run_sql(sql).await.unwrap();
440
441        let get_table = || {
442            let catalog_reader = session.env().catalog_reader().read_guard();
443            catalog_reader
444                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
445                .unwrap()
446                .0
447                .clone()
448        };
449
450        let table = get_table();
451
452        let columns: HashMap<_, _> = table
453            .columns
454            .iter()
455            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
456            .collect();
457
458        // Alter the table.
459        let sql = "alter table t add column s text;";
460        frontend.run_sql(sql).await.unwrap();
461
462        let altered_table = get_table();
463
464        let altered_columns: HashMap<_, _> = altered_table
465            .columns
466            .iter()
467            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
468            .collect();
469
470        // Check the new column.
471        assert_eq!(columns.len() + 1, altered_columns.len());
472        assert_eq!(altered_columns["s"].0, DataType::Varchar);
473
474        // Check the old columns and IDs are not changed.
475        assert_eq!(columns["i"], altered_columns["i"]);
476        assert_eq!(columns["r"], altered_columns["r"]);
477        assert_eq!(
478            columns[ROW_ID_COLUMN_NAME],
479            altered_columns[ROW_ID_COLUMN_NAME]
480        );
481
482        // Check the version is updated.
483        assert_eq!(
484            table.version.as_ref().unwrap().version_id + 1,
485            altered_table.version.as_ref().unwrap().version_id
486        );
487        assert_eq!(
488            table.version.as_ref().unwrap().next_column_id.next(),
489            altered_table.version.as_ref().unwrap().next_column_id
490        );
491    }
492}