risingwave_frontend/handler/
alter_table_with_sr.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
// Copyright 2025 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::{anyhow, Context};
use fancy_regex::Regex;
use pgwire::pg_response::{PgResponse, StatementType};
use risingwave_common::bail_not_implemented;
use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
use thiserror_ext::AsReport;

use super::alter_table_column::fetch_table_catalog_for_alter;
use super::create_source::{schema_has_schema_registry, SqlColumnStrategy};
use super::util::SourceSchemaCompatExt;
use super::{get_replace_table_plan, HandlerArgs, RwPgResponse};
use crate::error::{ErrorCode, Result};
use crate::TableCatalog;

fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
    let stmt = table.create_sql_ast()?;
    let Statement::CreateTable { format_encode, .. } = stmt else {
        unreachable!()
    };
    Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
}

pub async fn handle_refresh_schema(
    handler_args: HandlerArgs,
    table_name: ObjectName,
) -> Result<RwPgResponse> {
    let session = handler_args.session;
    let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;

    if !original_table.incoming_sinks.is_empty() {
        bail_not_implemented!("alter table with incoming sinks");
    }

    let format_encode = get_format_encode_from_table(&original_table)?;
    if !format_encode
        .as_ref()
        .is_some_and(schema_has_schema_registry)
    {
        return Err(ErrorCode::NotSupported(
            "tables without schema registry cannot be refreshed".to_owned(),
            "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
        )
        .into());
    }

    let definition = original_table
        .create_sql_ast_purified()
        .context("unable to parse original table definition")?;

    let (source, table, graph, col_index_mapping, job_type) = {
        let result = get_replace_table_plan(
            &session,
            table_name,
            definition,
            &original_table,
            SqlColumnStrategy::Ignore,
        )
        .await;
        match result {
            Ok((source, table, graph, col_index_mapping, job_type)) => {
                Ok((source, table, graph, col_index_mapping, job_type))
            }
            Err(e) => {
                let report = e.to_report_string();
                // NOTE(yuhao): This is a workaround for reporting errors when columns to drop is referenced by generated column.
                // Finding the actual columns to drop requires generating `PbSource` from the sql definition
                // and fetching schema from schema registry, which will cause a lot of unnecessary refactor.
                // Here we match the error message to yield when failing to bind generated column exprs.
                let re =
                    Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
                let captures = re.captures(&report).map_err(anyhow::Error::from)?;
                if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
                    Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
                gen_col_name.as_str())).into())
                } else {
                    Err(e)
                }
            }
        }
    }?;
    let catalog_writer = session.catalog_writer()?;

    catalog_writer
        .replace_table(source, table, graph, col_index_mapping, job_type)
        .await?;

    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
}