risingwave_frontend/handler/
alter_table_with_sr.rs1use anyhow::{Context, anyhow};
16use fancy_regex::Regex;
17use pgwire::pg_response::{PgResponse, StatementType};
18use risingwave_common::bail_not_implemented;
19use risingwave_sqlparser::ast::{FormatEncodeOptions, ObjectName, Statement};
20use thiserror_ext::AsReport;
21
22use super::alter_table_column::fetch_table_catalog_for_alter;
23use super::create_source::{SqlColumnStrategy, schema_has_schema_registry};
24use super::util::SourceSchemaCompatExt;
25use super::{HandlerArgs, RwPgResponse, get_replace_table_plan};
26use crate::TableCatalog;
27use crate::error::{ErrorCode, Result};
28
29fn get_format_encode_from_table(table: &TableCatalog) -> Result<Option<FormatEncodeOptions>> {
30 let stmt = table.create_sql_ast()?;
31 let Statement::CreateTable { format_encode, .. } = stmt else {
32 unreachable!()
33 };
34 Ok(format_encode.map(|schema| schema.into_v2_with_warning()))
35}
36
37pub async fn handle_refresh_schema(
38 handler_args: HandlerArgs,
39 table_name: ObjectName,
40) -> Result<RwPgResponse> {
41 let session = handler_args.session;
42 let (original_table, has_incoming_sinks) =
43 fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
44
45 if has_incoming_sinks {
46 bail_not_implemented!("alter table with incoming sinks");
47 }
48
49 let format_encode = get_format_encode_from_table(&original_table)?;
50 if !format_encode
51 .as_ref()
52 .is_some_and(schema_has_schema_registry)
53 {
54 return Err(ErrorCode::NotSupported(
55 "tables without schema registry cannot be refreshed".to_owned(),
56 "try `ALTER TABLE .. ADD/DROP COLUMN ...` instead".to_owned(),
57 )
58 .into());
59 }
60
61 let definition = original_table
62 .create_sql_ast_purified()
63 .context("unable to parse original table definition")?;
64
65 let (source, table, graph, job_type) = {
66 let result = get_replace_table_plan(
67 &session,
68 table_name,
69 definition,
70 &original_table,
71 SqlColumnStrategy::Ignore,
72 )
73 .await;
74 match result {
75 Ok((source, table, graph, job_type)) => Ok((source, table, graph, job_type)),
76 Err(e) => {
77 let report = e.to_report_string();
78 let re =
83 Regex::new(r#"fail to bind expression in generated column "(.*?)""#).unwrap();
84 let captures = re.captures(&report).map_err(anyhow::Error::from)?;
85 if let Some(gen_col_name) = captures.and_then(|captures| captures.get(1)) {
86 Err(anyhow!(e).context(format!("failed to refresh schema because some of the columns to drop are referenced by a generated column \"{}\"",
87 gen_col_name.as_str())).into())
88 } else {
89 Err(e)
90 }
91 }
92 }
93 }?;
94 let catalog_writer = session.catalog_writer()?;
95
96 catalog_writer
97 .replace_table(
98 source.map(|x| x.to_prost()),
99 table.to_prost(),
100 graph,
101 job_type,
102 )
103 .await?;
104
105 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
106}