risingwave_frontend/handler/
alter_table_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::{bail, bail_not_implemented};
22use risingwave_pb::ddl_service::TableJobType;
23use risingwave_pb::stream_plan::StreamFragmentGraph;
24use risingwave_sqlparser::ast::{
25    AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
26};
27
28use super::create_source::SqlColumnStrategy;
29use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
30use super::{HandlerArgs, RwPgResponse};
31use crate::catalog::purify::try_purify_table_source_create_sql_ast;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::table_catalog::TableType;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::expr::ExprImpl;
37use crate::session::SessionImpl;
38use crate::{Binder, TableCatalog};
39
40/// Used in auto schema change process
41pub async fn get_new_table_definition_for_cdc_table(
42    session: &Arc<SessionImpl>,
43    table_name: ObjectName,
44    new_columns: &[ColumnCatalog],
45) -> Result<(Statement, Arc<TableCatalog>)> {
46    let (original_catalog, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
47
48    assert_eq!(
49        original_catalog.row_id_index, None,
50        "primary key of cdc table must be user defined"
51    );
52
53    // Retrieve the original table definition.
54    let mut definition = original_catalog.create_sql_ast()?;
55
56    // Clear the original columns field, so that we'll follow `new_columns` to generate a
57    // purified definition.
58    {
59        let Statement::CreateTable {
60            columns,
61            constraints,
62            ..
63        } = &mut definition
64        else {
65            panic!("unexpected statement: {:?}", definition);
66        };
67
68        columns.clear();
69        constraints.clear();
70    }
71
72    let new_definition = try_purify_table_source_create_sql_ast(
73        definition,
74        new_columns,
75        None,
76        // The IDs of `new_columns` may not be consistently maintained at this point.
77        // So we use the column names to identify the primary key columns.
78        &original_catalog.pk_column_names(),
79    )?;
80
81    Ok((new_definition, original_catalog))
82}
83
84pub async fn get_replace_table_plan(
85    session: &Arc<SessionImpl>,
86    table_name: ObjectName,
87    new_definition: Statement,
88    old_catalog: &Arc<TableCatalog>,
89    sql_column_strategy: SqlColumnStrategy,
90) -> Result<(
91    Option<SourceCatalog>,
92    TableCatalog,
93    StreamFragmentGraph,
94    TableJobType,
95)> {
96    // Create handler args as if we're creating a new table with the altered definition.
97    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
98    let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
99
100    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
101        session,
102        table_name,
103        old_catalog,
104        handler_args.clone(),
105        new_definition,
106        col_id_gen,
107        sql_column_strategy,
108    )
109    .await?;
110
111    // Set some fields ourselves so that the meta service does not need to maintain them.
112    let mut table = table;
113    table.vnode_count = VnodeCount::set(old_catalog.vnode_count());
114
115    Ok((source, table, graph, job_type))
116}
117
118/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
119/// `DropColumn`.
120pub async fn handle_alter_table_column(
121    handler_args: HandlerArgs,
122    table_name: ObjectName,
123    operation: AlterTableOperation,
124) -> Result<RwPgResponse> {
125    let session = handler_args.session;
126    let (original_catalog, has_incoming_sinks) =
127        fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
128
129    if original_catalog.webhook_info.is_some() {
130        return Err(RwError::from(ErrorCode::BindError(
131            "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
132        )));
133    }
134
135    // Retrieve the original table definition and parse it to AST.
136    let mut definition = original_catalog.create_sql_ast_purified()?;
137    let Statement::CreateTable { columns, .. } = &mut definition else {
138        panic!("unexpected statement: {:?}", definition);
139    };
140
141    if has_incoming_sinks && matches!(operation, AlterTableOperation::DropColumn { .. }) {
142        return Err(ErrorCode::InvalidInputSyntax(
143            "dropping columns in target table of sinks is not supported".to_owned(),
144        ))?;
145    }
146
147    // The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
148    // `FollowUnchecked` if the operation is `DropColumn`.
149    //
150    // Consider the following example:
151    // - There was a column `foo` and a generated column `gen` that references `foo`.
152    // - The external schema is updated to remove `foo`.
153    // - The user tries to drop `foo` from the table.
154    //
155    // Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
156    // first will also be rejected because `foo` does not exist any more. Also, executing
157    // `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
158    //
159    // `FollowUnchecked` workarounds this issue. There are also some alternatives:
160    // - Allow dropping multiple columns at once.
161    // - Check against the persisted schema, instead of resolving again.
162    //
163    // Applied only to tables with schema registry.
164    let sql_column_strategy = match operation {
165        AlterTableOperation::AddColumn {
166            column_def: new_column,
167        } => {
168            // Duplicated names can actually be checked by `StreamMaterialize`. We do here for
169            // better error reporting.
170            let new_column_name = new_column.name.real_value();
171            if columns
172                .iter()
173                .any(|c| c.name.real_value() == new_column_name)
174            {
175                Err(ErrorCode::InvalidInputSyntax(format!(
176                    "column \"{new_column_name}\" of table \"{table_name}\" already exists"
177                )))?
178            }
179
180            if new_column
181                .options
182                .iter()
183                .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
184            {
185                Err(ErrorCode::InvalidInputSyntax(
186                    "alter table add generated columns is not supported".to_owned(),
187                ))?
188            }
189
190            if new_column
191                .options
192                .iter()
193                .any(|x| matches!(x.option, ColumnOption::NotNull))
194                && !new_column
195                    .options
196                    .iter()
197                    .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
198            {
199                return Err(ErrorCode::InvalidInputSyntax(
200                    "alter table add NOT NULL columns must have default value".to_owned(),
201                ))?;
202            }
203
204            // Add the new column to the table definition if it is not created by `create table (*)` syntax.
205            columns.push(new_column);
206
207            SqlColumnStrategy::FollowChecked
208        }
209
210        AlterTableOperation::DropColumn {
211            column_name,
212            if_exists,
213            cascade,
214        } => {
215            if cascade {
216                bail_not_implemented!(issue = 6903, "drop column cascade");
217            }
218
219            // Check if the column to drop is referenced by any generated columns.
220            for column in original_catalog.columns() {
221                if let Some(expr) = column.generated_expr() {
222                    let expr = ExprImpl::from_expr_proto(expr)?;
223                    let refs = expr.collect_input_refs(original_catalog.columns().len());
224                    for idx in refs.ones() {
225                        let refed_column = &original_catalog.columns()[idx];
226                        if refed_column.name() == column_name.real_value() {
227                            bail!(format!(
228                                "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
229                                column_name,
230                                column.name()
231                            ))
232                        }
233                    }
234                }
235            }
236
237            // Locate the column by name and remove it.
238            let column_name = column_name.real_value();
239            let removed_column = columns
240                .extract_if(.., |c| c.name.real_value() == column_name)
241                .at_most_one()
242                .ok()
243                .unwrap();
244
245            if removed_column.is_some() {
246                // PASS
247            } else if if_exists {
248                return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
249                    .notice(format!(
250                        "column \"{}\" does not exist, skipping",
251                        column_name
252                    ))
253                    .into());
254            } else {
255                Err(ErrorCode::InvalidInputSyntax(format!(
256                    "column \"{}\" of table \"{}\" does not exist",
257                    column_name, table_name
258                )))?
259            }
260
261            SqlColumnStrategy::FollowUnchecked
262        }
263
264        AlterTableOperation::AlterColumn { column_name, op } => {
265            let AlterColumnOperation::SetDataType {
266                data_type,
267                using: None,
268            } = op
269            else {
270                bail_not_implemented!(issue = 6903, "{op}");
271            };
272
273            // Locate the column by name and update its data type.
274            let column_name = column_name.real_value();
275            let column = columns
276                .iter_mut()
277                .find(|c| c.name.real_value() == column_name)
278                .ok_or_else(|| {
279                    ErrorCode::InvalidInputSyntax(format!(
280                        "column \"{}\" of table \"{}\" does not exist",
281                        column_name, table_name
282                    ))
283                })?;
284
285            column.data_type = Some(data_type);
286
287            SqlColumnStrategy::FollowChecked
288        }
289
290        _ => unreachable!(),
291    };
292    let (source, table, graph, job_type) = get_replace_table_plan(
293        &session,
294        table_name,
295        definition,
296        &original_catalog,
297        sql_column_strategy,
298    )
299    .await?;
300
301    let catalog_writer = session.catalog_writer()?;
302
303    catalog_writer
304        .replace_table(
305            source.map(|x| x.to_prost()),
306            table.to_prost(),
307            graph,
308            job_type,
309        )
310        .await?;
311    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
312}
313
314pub fn fetch_table_catalog_for_alter(
315    session: &SessionImpl,
316    table_name: &ObjectName,
317) -> Result<(Arc<TableCatalog>, bool)> {
318    let db_name = &session.database();
319    let (schema_name, real_table_name) =
320        Binder::resolve_schema_qualified_name(db_name, table_name)?;
321    let search_path = session.config().search_path();
322    let user_name = &session.user_name();
323
324    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
325
326    {
327        let reader = session.env().catalog_reader().read_guard();
328        let (table, schema_name) =
329            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
330
331        match table.table_type() {
332            TableType::Table => {}
333
334            _ => Err(ErrorCode::InvalidInputSyntax(format!(
335                "\"{table_name}\" is not a table or cannot be altered"
336            )))?,
337        }
338
339        session.check_privilege_for_drop_alter(schema_name, &**table)?;
340
341        let has_incoming_sinks = reader
342            .get_schema_by_id(&table.database_id, &table.schema_id)?
343            .table_incoming_sinks(table.id)
344            .map(|sinks| !sinks.is_empty())
345            .unwrap_or(false);
346
347        Ok((table.clone(), has_incoming_sinks))
348    }
349}
350
351#[cfg(test)]
352mod tests {
353    use std::collections::HashMap;
354
355    use risingwave_common::catalog::{
356        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
357    };
358    use risingwave_common::types::DataType;
359
360    use crate::catalog::root_catalog::SchemaPath;
361    use crate::test_utils::LocalFrontend;
362
363    #[tokio::test]
364    async fn test_add_column_handler() {
365        let frontend = LocalFrontend::new(Default::default()).await;
366        let session = frontend.session_ref();
367        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
368
369        let sql = "create table t (i int, r real);";
370        frontend.run_sql(sql).await.unwrap();
371
372        let get_table = || {
373            let catalog_reader = session.env().catalog_reader().read_guard();
374            catalog_reader
375                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
376                .unwrap()
377                .0
378                .clone()
379        };
380
381        let table = get_table();
382
383        let columns: HashMap<_, _> = table
384            .columns
385            .iter()
386            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
387            .collect();
388
389        // Alter the table.
390        let sql = "alter table t add column s text;";
391        frontend.run_sql(sql).await.unwrap();
392
393        let altered_table = get_table();
394
395        let altered_columns: HashMap<_, _> = altered_table
396            .columns
397            .iter()
398            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
399            .collect();
400
401        // Check the new column.
402        assert_eq!(columns.len() + 1, altered_columns.len());
403        assert_eq!(altered_columns["s"].0, DataType::Varchar);
404
405        // Check the old columns and IDs are not changed.
406        assert_eq!(columns["i"], altered_columns["i"]);
407        assert_eq!(columns["r"], altered_columns["r"]);
408        assert_eq!(
409            columns[ROW_ID_COLUMN_NAME],
410            altered_columns[ROW_ID_COLUMN_NAME]
411        );
412
413        // Check the version is updated.
414        assert_eq!(
415            table.version.as_ref().unwrap().version_id + 1,
416            altered_table.version.as_ref().unwrap().version_id
417        );
418        assert_eq!(
419            table.version.as_ref().unwrap().next_column_id.next(),
420            altered_table.version.as_ref().unwrap().next_column_id
421        );
422    }
423}