risingwave_frontend/handler/
alter_watermark.rs1use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::iter_util::ZipEqFast;
17use risingwave_sqlparser::ast::{Expr, Ident, ObjectName, Statement};
18
19use super::alter_table_column::{fetch_table_catalog_for_alter, get_replace_table_plan};
20use super::create_source::SqlColumnStrategy;
21use super::{HandlerArgs, RwPgResponse};
22use crate::TableCatalog;
23use crate::error::{ErrorCode, Result, RwError};
24
25pub async fn handle_alter_watermark(
26 handler_args: HandlerArgs,
27 table_name: ObjectName,
28 column_name: Ident,
29 expr: Expr,
30 with_ttl: bool,
31) -> Result<RwPgResponse> {
32 let session = handler_args.session;
33 let (original_catalog, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
34
35 if original_catalog.webhook_info.is_some() {
36 return Err(ErrorCode::BindError(
37 "ALTER WATERMARK on a table with webhook has not been implemented.".to_owned(),
38 )
39 .into());
40 }
41
42 let mut definition = original_catalog.create_sql_ast_purified()?;
43 let Statement::CreateTable {
44 source_watermarks, ..
45 } = &mut definition
46 else {
47 panic!("unexpected statement: {:?}", definition);
48 };
49
50 let column_real_value = column_name.real_value();
51 let existing = source_watermarks
52 .iter_mut()
53 .find(|w| w.column.real_value() == column_real_value)
54 .ok_or_else(|| {
55 ErrorCode::InvalidInputSyntax(format!(
56 "no watermark defined on column \"{}\" of table \"{}\"",
57 column_real_value, table_name
58 ))
59 })?;
60
61 if existing.with_ttl != with_ttl {
65 return Err(ErrorCode::NotSupported(
66 "toggling WITH TTL via ALTER WATERMARK is not supported".to_owned(),
67 "drop and recreate the table".to_owned(),
68 )
69 .into());
70 }
71
72 existing.expr = expr;
73
74 let (source, new_table, graph, job_type) = Box::pin(get_replace_table_plan(
75 &session,
76 table_name,
77 definition,
78 &original_catalog,
79 SqlColumnStrategy::FollowChecked,
80 ))
81 .await?;
82
83 check_replace_safe(&original_catalog, &new_table)?;
84
85 let catalog_writer = session.catalog_writer()?;
86 catalog_writer
87 .replace_table(
88 source.map(|x| x.to_prost()),
89 new_table.to_prost(),
90 graph,
91 job_type,
92 )
93 .await?;
94 Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
95}
96
97fn check_replace_safe(old: &TableCatalog, new: &TableCatalog) -> Result<()> {
106 fn diff<T: std::fmt::Debug>(field: &str, old: T, new: T) -> RwError {
107 ErrorCode::NotSupported(
108 format!(
109 "ALTER WATERMARK would change `{}` of the table\nold: {:?}\nnew: {:?}",
110 field, old, new
111 ),
112 "only changes that preserve the plan's state schema are supported; \
113 drop and recreate the table for shape changes"
114 .to_owned(),
115 )
116 .into()
117 }
118
119 if old.stream_key != new.stream_key {
120 return Err(diff("stream_key", &old.stream_key, &new.stream_key));
121 }
122 if old.pk != new.pk {
123 return Err(diff("pk", &old.pk, &new.pk));
124 }
125 let old_wm: Vec<_> = old.watermark_columns.ones().collect();
126 let new_wm: Vec<_> = new.watermark_columns.ones().collect();
127 if old_wm != new_wm {
128 return Err(diff("watermark_columns", &old_wm, &new_wm));
129 }
130 if old.clean_watermark_indices != new.clean_watermark_indices {
131 return Err(diff(
132 "clean_watermark_indices",
133 &old.clean_watermark_indices,
134 &new.clean_watermark_indices,
135 ));
136 }
137 if old.clean_watermark_index_in_pk != new.clean_watermark_index_in_pk {
138 return Err(diff(
139 "clean_watermark_index_in_pk",
140 &old.clean_watermark_index_in_pk,
141 &new.clean_watermark_index_in_pk,
142 ));
143 }
144 let old_cols = old.columns_without_rw_timestamp();
149 let new_cols = new.columns_without_rw_timestamp();
150 if old_cols.len() != new_cols.len() {
151 return Err(diff("columns.len()", &old_cols.len(), &new_cols.len()));
152 }
153 for (i, (o, n)) in old_cols.iter().zip_eq_fast(new_cols.iter()).enumerate() {
154 if o.data_type() != n.data_type() {
155 return Err(ErrorCode::NotSupported(
156 format!(
157 "ALTER WATERMARK would change the data type of column \"{}\" (index {}): {:?} -> {:?}",
158 o.name(), i, o.data_type(), n.data_type()
159 ),
160 "drop and recreate the table".to_owned(),
161 )
162 .into());
163 }
164 }
165 Ok(())
166}