risingwave_frontend/handler/
alter_table_column.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use itertools::Itertools;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::ColumnCatalog;
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::{bail, bail_not_implemented};
22use risingwave_pb::ddl_service::TableJobType;
23use risingwave_pb::stream_plan::StreamFragmentGraph;
24use risingwave_sqlparser::ast::{
25    AlterColumnOperation, AlterTableOperation, ColumnOption, ObjectName, Statement,
26};
27
28use super::create_source::SqlColumnStrategy;
29use super::create_table::{ColumnIdGenerator, generate_stream_graph_for_replace_table};
30use super::{HandlerArgs, RwPgResponse};
31use crate::catalog::purify::try_purify_table_source_create_sql_ast;
32use crate::catalog::root_catalog::SchemaPath;
33use crate::catalog::source_catalog::SourceCatalog;
34use crate::catalog::table_catalog::TableType;
35use crate::error::{ErrorCode, Result, RwError};
36use crate::expr::ExprImpl;
37use crate::session::SessionImpl;
38use crate::{Binder, TableCatalog};
39
40/// Used in auto schema change process
41pub async fn get_new_table_definition_for_cdc_table(
42    original_catalog: Arc<TableCatalog>,
43    new_columns: &[ColumnCatalog],
44) -> Result<Statement> {
45    assert_eq!(
46        original_catalog.row_id_index, None,
47        "primary key of cdc table must be user defined"
48    );
49
50    // Retrieve the original table definition.
51    let mut definition = original_catalog.create_sql_ast()?;
52
53    // Clear the original columns field, so that we'll follow `new_columns` to generate a
54    // purified definition.
55    {
56        let Statement::CreateTable {
57            columns,
58            constraints,
59            ..
60        } = &mut definition
61        else {
62            panic!("unexpected statement: {:?}", definition);
63        };
64
65        columns.clear();
66        constraints.clear();
67    }
68
69    let new_definition = try_purify_table_source_create_sql_ast(
70        definition,
71        new_columns,
72        None,
73        // The IDs of `new_columns` may not be consistently maintained at this point.
74        // So we use the column names to identify the primary key columns.
75        &original_catalog.pk_column_names(),
76    )?;
77
78    Ok(new_definition)
79}
80
81pub async fn get_replace_table_plan(
82    session: &Arc<SessionImpl>,
83    table_name: ObjectName,
84    new_definition: Statement,
85    old_catalog: &Arc<TableCatalog>,
86    sql_column_strategy: SqlColumnStrategy,
87) -> Result<(
88    Option<SourceCatalog>,
89    TableCatalog,
90    StreamFragmentGraph,
91    TableJobType,
92)> {
93    // Create handler args as if we're creating a new table with the altered definition.
94    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
95    let col_id_gen = ColumnIdGenerator::new_alter(old_catalog);
96
97    let (graph, table, source, job_type) = generate_stream_graph_for_replace_table(
98        session,
99        table_name,
100        old_catalog,
101        handler_args.clone(),
102        new_definition,
103        col_id_gen,
104        sql_column_strategy,
105    )
106    .await?;
107
108    // Set some fields ourselves so that the meta service does not need to maintain them.
109    let mut table = table;
110    table.vnode_count = VnodeCount::set(old_catalog.vnode_count());
111
112    Ok((source, table, graph, job_type))
113}
114
115/// Handle `ALTER TABLE [ADD|DROP] COLUMN` statements. The `operation` must be either `AddColumn` or
116/// `DropColumn`.
117pub async fn handle_alter_table_column(
118    handler_args: HandlerArgs,
119    table_name: ObjectName,
120    operation: AlterTableOperation,
121) -> Result<RwPgResponse> {
122    let session = handler_args.session;
123    let (original_catalog, has_incoming_sinks) =
124        fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
125
126    if original_catalog.webhook_info.is_some() {
127        return Err(RwError::from(ErrorCode::BindError(
128            "Adding/dropping a column of a table with webhook has not been implemented.".to_owned(),
129        )));
130    }
131
132    // Retrieve the original table definition and parse it to AST.
133    let mut definition = original_catalog.create_sql_ast_purified()?;
134    let Statement::CreateTable { columns, .. } = &mut definition else {
135        panic!("unexpected statement: {:?}", definition);
136    };
137
138    if has_incoming_sinks && matches!(operation, AlterTableOperation::DropColumn { .. }) {
139        return Err(ErrorCode::InvalidInputSyntax(
140            "dropping columns in target table of sinks is not supported".to_owned(),
141        ))?;
142    }
143
144    // The `sql_column_strategy` will be `FollowChecked` if the operation is `AddColumn`, and
145    // `FollowUnchecked` if the operation is `DropColumn`.
146    //
147    // Consider the following example:
148    // - There was a column `foo` and a generated column `gen` that references `foo`.
149    // - The external schema is updated to remove `foo`.
150    // - The user tries to drop `foo` from the table.
151    //
152    // Dropping `foo` directly will fail because `gen` references `foo`. However, dropping `gen`
153    // first will also be rejected because `foo` does not exist any more. Also, executing
154    // `REFRESH SCHEMA` will not help because it keeps the generated column. The user gets stuck.
155    //
156    // `FollowUnchecked` workarounds this issue. There are also some alternatives:
157    // - Allow dropping multiple columns at once.
158    // - Check against the persisted schema, instead of resolving again.
159    //
160    // Applied only to tables with schema registry.
161    let sql_column_strategy = match operation {
162        AlterTableOperation::AddColumn {
163            column_def: new_column,
164        } => {
165            // Duplicated names can actually be checked by `StreamMaterialize`. We do here for
166            // better error reporting.
167            let new_column_name = new_column.name.real_value();
168            if columns
169                .iter()
170                .any(|c| c.name.real_value() == new_column_name)
171            {
172                Err(ErrorCode::InvalidInputSyntax(format!(
173                    "column \"{new_column_name}\" of table \"{table_name}\" already exists"
174                )))?
175            }
176
177            if new_column
178                .options
179                .iter()
180                .any(|x| matches!(x.option, ColumnOption::GeneratedColumns(_)))
181            {
182                Err(ErrorCode::InvalidInputSyntax(
183                    "alter table add generated columns is not supported".to_owned(),
184                ))?
185            }
186
187            if new_column
188                .options
189                .iter()
190                .any(|x| matches!(x.option, ColumnOption::NotNull))
191                && !new_column
192                    .options
193                    .iter()
194                    .any(|x| matches!(x.option, ColumnOption::DefaultValue(_)))
195            {
196                return Err(ErrorCode::InvalidInputSyntax(
197                    "alter table add NOT NULL columns must have default value".to_owned(),
198                ))?;
199            }
200
201            // Add the new column to the table definition if it is not created by `create table (*)` syntax.
202            columns.push(new_column);
203
204            SqlColumnStrategy::FollowChecked
205        }
206
207        AlterTableOperation::DropColumn {
208            column_name,
209            if_exists,
210            cascade,
211        } => {
212            if cascade {
213                bail_not_implemented!(issue = 6903, "drop column cascade");
214            }
215
216            // Check if the column to drop is referenced by any generated columns.
217            for column in original_catalog.columns() {
218                if let Some(expr) = column.generated_expr() {
219                    let expr = ExprImpl::from_expr_proto(expr)?;
220                    let refs = expr.collect_input_refs(original_catalog.columns().len());
221                    for idx in refs.ones() {
222                        let refed_column = &original_catalog.columns()[idx];
223                        if refed_column.name() == column_name.real_value() {
224                            bail!(format!(
225                                "failed to drop column \"{}\" because it's referenced by a generated column \"{}\"",
226                                column_name,
227                                column.name()
228                            ))
229                        }
230                    }
231                }
232            }
233
234            // Locate the column by name and remove it.
235            let column_name = column_name.real_value();
236            let removed_column = columns
237                .extract_if(.., |c| c.name.real_value() == column_name)
238                .at_most_one()
239                .ok()
240                .unwrap();
241
242            if removed_column.is_some() {
243                // PASS
244            } else if if_exists {
245                return Ok(PgResponse::builder(StatementType::ALTER_TABLE)
246                    .notice(format!(
247                        "column \"{}\" does not exist, skipping",
248                        column_name
249                    ))
250                    .into());
251            } else {
252                Err(ErrorCode::InvalidInputSyntax(format!(
253                    "column \"{}\" of table \"{}\" does not exist",
254                    column_name, table_name
255                )))?
256            }
257
258            SqlColumnStrategy::FollowUnchecked
259        }
260
261        AlterTableOperation::AlterColumn { column_name, op } => {
262            let AlterColumnOperation::SetDataType {
263                data_type,
264                using: None,
265            } = op
266            else {
267                bail_not_implemented!(issue = 6903, "{op}");
268            };
269
270            // Locate the column by name and update its data type.
271            let column_name = column_name.real_value();
272            let column = columns
273                .iter_mut()
274                .find(|c| c.name.real_value() == column_name)
275                .ok_or_else(|| {
276                    ErrorCode::InvalidInputSyntax(format!(
277                        "column \"{}\" of table \"{}\" does not exist",
278                        column_name, table_name
279                    ))
280                })?;
281
282            column.data_type = Some(data_type);
283
284            SqlColumnStrategy::FollowChecked
285        }
286
287        _ => unreachable!(),
288    };
289    let (source, table, graph, job_type) = get_replace_table_plan(
290        &session,
291        table_name,
292        definition,
293        &original_catalog,
294        sql_column_strategy,
295    )
296    .await?;
297
298    let catalog_writer = session.catalog_writer()?;
299
300    catalog_writer
301        .replace_table(
302            source.map(|x| x.to_prost()),
303            table.to_prost(),
304            graph,
305            job_type,
306        )
307        .await?;
308    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
309}
310
311pub fn fetch_table_catalog_for_alter(
312    session: &SessionImpl,
313    table_name: &ObjectName,
314) -> Result<(Arc<TableCatalog>, bool)> {
315    let db_name = &session.database();
316    let (schema_name, real_table_name) =
317        Binder::resolve_schema_qualified_name(db_name, table_name)?;
318    let search_path = session.config().search_path();
319    let user_name = &session.user_name();
320
321    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
322
323    {
324        let reader = session.env().catalog_reader().read_guard();
325        let (table, schema_name) =
326            reader.get_created_table_by_name(db_name, schema_path, &real_table_name)?;
327
328        match table.table_type() {
329            TableType::Table => {}
330
331            _ => Err(ErrorCode::InvalidInputSyntax(format!(
332                "\"{table_name}\" is not a table or cannot be altered"
333            )))?,
334        }
335
336        session.check_privilege_for_drop_alter(schema_name, &**table)?;
337
338        let has_incoming_sinks = reader
339            .get_schema_by_id(&table.database_id, &table.schema_id)?
340            .table_incoming_sinks(table.id)
341            .map(|sinks| !sinks.is_empty())
342            .unwrap_or(false);
343
344        Ok((table.clone(), has_incoming_sinks))
345    }
346}
347
348#[cfg(test)]
349mod tests {
350    use std::collections::HashMap;
351
352    use risingwave_common::catalog::{
353        DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME,
354    };
355    use risingwave_common::types::DataType;
356
357    use crate::catalog::root_catalog::SchemaPath;
358    use crate::test_utils::LocalFrontend;
359
360    #[tokio::test]
361    async fn test_add_column_handler() {
362        let frontend = LocalFrontend::new(Default::default()).await;
363        let session = frontend.session_ref();
364        let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
365
366        let sql = "create table t (i int, r real);";
367        frontend.run_sql(sql).await.unwrap();
368
369        let get_table = || {
370            let catalog_reader = session.env().catalog_reader().read_guard();
371            catalog_reader
372                .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "t")
373                .unwrap()
374                .0
375                .clone()
376        };
377
378        let table = get_table();
379
380        let columns: HashMap<_, _> = table
381            .columns
382            .iter()
383            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
384            .collect();
385
386        // Alter the table.
387        let sql = "alter table t add column s text;";
388        frontend.run_sql(sql).await.unwrap();
389
390        let altered_table = get_table();
391
392        let altered_columns: HashMap<_, _> = altered_table
393            .columns
394            .iter()
395            .map(|col| (col.name(), (col.data_type().clone(), col.column_id())))
396            .collect();
397
398        // Check the new column.
399        assert_eq!(columns.len() + 1, altered_columns.len());
400        assert_eq!(altered_columns["s"].0, DataType::Varchar);
401
402        // Check the old columns and IDs are not changed.
403        assert_eq!(columns["i"], altered_columns["i"]);
404        assert_eq!(columns["r"], altered_columns["r"]);
405        assert_eq!(
406            columns[ROW_ID_COLUMN_NAME],
407            altered_columns[ROW_ID_COLUMN_NAME]
408        );
409
410        // Check the version is updated.
411        assert_eq!(
412            table.version.as_ref().unwrap().version_id + 1,
413            altered_table.version.as_ref().unwrap().version_id
414        );
415        assert_eq!(
416            table.version.as_ref().unwrap().next_column_id.next(),
417            altered_table.version.as_ref().unwrap().next_column_id
418        );
419    }
420}