risingwave_frontend/handler/
alter_table_with_sr.rs1use anyhow::{Context, anyhow};
16use fancy_regex::Regex;
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
20use thiserror_ext::AsReport;
21
22use super::alter_table_column::fetch_table_catalog_for_alter;
23use super::create_source::{SqlColumnStrategy, schema_has_schema_registry};
24use super::util::SourceSchemaCompatExt;
25use super::{HandlerArgs, RwPgResponse, get_replace_table_plan};
26use crate::TableCatalog;
27use crate::error::{ErrorCode, Result};
28
29fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
30 let stmt = table.create_sql_ast()?;
31 let Statement::CreateTable { format_encode, .. } = stmt else {
32 unreachable!()
33 };
34 Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
35}
36
37pub async fn handle_refresh_schema(
38 handler_args: HandlerArgs,
39 table_name: ObjectName,
40) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let original_table = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
43
44 if !original_table.incoming_sinks.is_empty() {
45 bail_not_implemented!("alter table with incoming sinks");
46 }
47
48 let format_encode = get_format_encode_from_table(&original_table)?;
49 if !format_encode
50 .as_ref()
51 .is_some_and(schema_has_schema_registry)
52 {
53 return Err(ErrorCode::NotSupported(
54 "tables without schema registry cannot be refreshed".to_owned(),
55 "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
56 )
57 .into());
58 }
59
60 let definition = original_table
61 .create_sql_ast_purified()
62 .context("unable to parse original table definition")?;
63
64 let (source, table, graph, col_index_mapping, job_type) = {
65 let result = get_replace_table_plan(
66 &session,
67 table_name,
68 definition,
69 &original_table,
70 SqlColumnStrategy::Ignore,
71 )
72 .await;
73 match result {
74 Ok((source, table, graph, col_index_mapping, job_type)) => {
75 Ok((source, table, graph, col_index_mapping, job_type))
76 }
77 Err(e) => {
78 let report = e.to_report_string();
79 let re =
84 Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
85 let captures = re.captures(&report).map_err(anyhow::Error::from)?;
86 if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
87 Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
88 gen_col_name.as_str())).into())
89 } else {
90 Err(e)
91 }
92 }
93 }
94 }?;
95 let catalog_writer = session.catalog_writer()?;
96
97 catalog_writer
98 .replace_table(source, table, graph, col_index_mapping, job_type)
99 .await?;
100
101 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
102}