risingwave_frontend/handler/
alter_table_with_sr.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use anyhow::{Context, anyhow};
16use fancy_regex::Regex;
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
20use thiserror_ext::AsReport;
21
22use super::alter_table_column::fetch_table_catalog_for_alter;
23use super::create_source::{SqlColumnStrategy, schema_has_schema_registry};
24use super::util::SourceSchemaCompatExt;
25use super::{HandlerArgs, RwPgResponse, get_replace_table_plan};
26use crate::TableCatalog;
27use crate::error::{ErrorCode, Result};
28
29fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
30    let stmt = table.create_sql_ast()?;
31    let Statement::CreateTable { format_encode, .. } = stmt else {
32        unreachable!()
33    };
34    Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
35}
36
37pub async fn handle_refresh_schema(
38    handler_args: HandlerArgs,
39    table_name: ObjectName,
40) -> Result<RwPgResponse> {
41    let session = handler_args.session;
42    let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
43
44    if !original_table.incoming_sinks.is_empty() {
45        bail_not_implemented!("alter table with incoming sinks");
46    }
47
48    let format_encode = get_format_encode_from_table(&original_table)?;
49    if !format_encode
50        .as_ref()
51        .is_some_and(schema_has_schema_registry)
52    {
53        return Err(ErrorCode::NotSupported(
54            "tables without schema registry cannot be refreshed".to_owned(),
55            "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
56        )
57        .into());
58    }
59
60    let definition = original_table
61        .create_sql_ast_purified()
62        .context("unable to parse original table definition")?;
63
64    let (source, table, graph, col_index_mapping, job_type) = {
65        let result = get_replace_table_plan(
66            &session,
67            table_name,
68            definition,
69            &original_table,
70            SqlColumnStrategy::Ignore,
71        )
72        .await;
73        match result {
74            Ok((source, table, graph, col_index_mapping, job_type)) => {
75                Ok((source, table, graph, col_index_mapping, job_type))
76            }
77            Err(e) => {
78                let report = e.to_report_string();
79                // NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column.
80                // Finding the actual columns to drop requires generating `PbSource` from the sql definition
81                // and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
82                // Here we match the error message to yield when failing to bind generated column exprs.
83                let re =
84                    Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
85                let captures = re.captures(&report).map_err(anyhow::Error::from)?;
86                if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
87                    Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
88                gen_col_name.as_str())).into())
89                } else {
90                    Err(e)
91                }
92            }
93        }
94    }?;
95    let catalog_writer = session.catalog_writer()?;
96
97    catalog_writer
98        .replace_table(source, table, graph, col_index_mapping, job_type)
99        .await?;
100
101    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
102}