risingwave_frontend/handler/
alter_table_with_sr.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use fancy_regex::Regex;
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
20use thiserror_ext::AsReport;
21
22use super::alter_table_column::fetch_table_catalog_for_alter;
23use super::create_source::{SqlColumnStrategy, schema_has_schema_registry};
24use super::util::SourceSchemaCompatExt;
25use super::{HandlerArgs, RwPgResponse, get_replace_table_plan};
26use crate::TableCatalog;
27use crate::error::{ErrorCode, Result};
28
29fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
30    let stmt = table.create_sql_ast()?;
31    let Statement::CreateTable { format_encode, .. } = stmt else {
32        unreachable!()
33    };
34    Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
35}
36
37pub async fn handle_refresh_schema(
38    handler_args: HandlerArgs,
39    table_name: ObjectName,
40) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let (original_table, has_incoming_sinks) =
43        fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
44
45    if has_incoming_sinks {
46        bail_not_implemented!("alter table with incoming sinks");
47    }
48
49    let format_encode = get_format_encode_from_table(&original_table)?;
50    if !format_encode
51        .as_ref()
52        .is_some_and(schema_has_schema_registry)
53    {
54        return Err(ErrorCode::NotSupported(
55            "tables without schema registry cannot be refreshed".to_owned(),
56            "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
57        )
58        .into());
59    }
60
61    let definition = original_table
62        .create_sql_ast_purified()
63        .context("unable to parse original table definition")?;
64
65    let (source, table, graph, job_type) = {
66        let result = get_replace_table_plan(
67            &session,
68            table_name,
69            definition,
70            &original_table,
71            SqlColumnStrategy::Ignore,
72        )
73        .await;
74        match result {
75            Ok((source, table, graph, job_type)) => Ok((source, table, graph, job_type)),
76            Err(e) => {
77                let report = e.to_report_string();
78                // NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column.
79                // Finding the actual columns to drop requires generating `PbSource` from the sql definition
80                // and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
81                // Here we match the error message to yield when failing to bind generated column exprs.
82                let re =
83                    Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
84                let captures = re.captures(&report).map_err(anyhow::Error::from)?;
85                if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
86                    Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
87                gen_col_name.as_str())).into())
88                } else {
89                    Err(e)
90                }
91            }
92        }
93    }?;
94    let catalog_writer = session.catalog_writer()?;
95
96    catalog_writer
97        .replace_table(
98            source.map(|x| x.to_prost()),
99            table.to_prost(),
100            graph,
101            job_type,
102        )
103        .await?;
104
105    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
106}