risingwave_frontend/handler/
alter_watermark.rs

1// Copyright 2026 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use pgwire::pg_response::{PgResponse, StatementType};
16use risingwave_common::util::iter_util::ZipEqFast;
17use risingwave_sqlparser::ast::{Expr, Ident, ObjectName, Statement};
18
19use super::alter_table_column::{fetch_table_catalog_for_alter, get_replace_table_plan};
20use super::create_source::SqlColumnStrategy;
21use super::{HandlerArgs, RwPgResponse};
22use crate::TableCatalog;
23use crate::error::{ErrorCode, Result, RwError};
24
25pub async fn handle_alter_watermark(
26    handler_args: HandlerArgs,
27    table_name: ObjectName,
28    column_name: Ident,
29    expr: Expr,
30    with_ttl: bool,
31) -> Result<RwPgResponse> {
32    let session = handler_args.session;
33    let (original_catalog, _) = fetch_table_catalog_for_alter(session.as_ref(), &table_name)?;
34
35    if original_catalog.webhook_info.is_some() {
36        return Err(ErrorCode::BindError(
37            "ALTER WATERMARK on a table with webhook has not been implemented.".to_owned(),
38        )
39        .into());
40    }
41
42    let mut definition = original_catalog.create_sql_ast_purified()?;
43    let Statement::CreateTable {
44        source_watermarks, ..
45    } = &mut definition
46    else {
47        panic!("unexpected statement: {:?}", definition);
48    };
49
50    let column_real_value = column_name.real_value();
51    let existing = source_watermarks
52        .iter_mut()
53        .find(|w| w.column.real_value() == column_real_value)
54        .ok_or_else(|| {
55            ErrorCode::InvalidInputSyntax(format!(
56                "no watermark defined on column \"{}\" of table \"{}\"",
57                column_real_value, table_name
58            ))
59        })?;
60
61    // v1: toggling WITH TTL changes `stream_key` / `clean_watermark_indices` on the
62    // materialize catalog, which `fit_internal_tables_trivial` cannot safely reconcile.
63    // Deferred to v2.
64    if existing.with_ttl != with_ttl {
65        return Err(ErrorCode::NotSupported(
66            "toggling WITH TTL via ALTER WATERMARK is not supported".to_owned(),
67            "drop and recreate the table".to_owned(),
68        )
69        .into());
70    }
71
72    existing.expr = expr;
73
74    let (source, new_table, graph, job_type) = Box::pin(get_replace_table_plan(
75        &session,
76        table_name,
77        definition,
78        &original_catalog,
79        SqlColumnStrategy::FollowChecked,
80    ))
81    .await?;
82
83    check_replace_safe(&original_catalog, &new_table)?;
84
85    let catalog_writer = session.catalog_writer()?;
86    catalog_writer
87        .replace_table(
88            source.map(|x| x.to_prost()),
89            new_table.to_prost(),
90            graph,
91            job_type,
92        )
93        .await?;
94    Ok(PgResponse::empty_result(StatementType::ALTER_TABLE))
95}
96
97// Defense-in-depth. Today every v1 input that survives the AST guard above also
98// survives `get_replace_table_plan`'s binder type check, so none of the diffs below
99// can fire for valid input. The check is kept for two reasons:
100//   1. Future relaxations (e.g. allowing TTL toggle in v2) will start producing
101//      `stream_key` / `clean_watermark_indices` diffs and need this guard.
102//   2. The meta-side `fit_internal_tables_trivial` path is a wholesale-overwrite
103//      that would silently corrupt state if it ever paired tables with mismatched
104//      schemas; this is the last line of defense before that happens.
105fn check_replace_safe(old: &TableCatalog, new: &TableCatalog) -> Result<()> {
106    fn diff<T: std::fmt::Debug>(field: &str, old: T, new: T) -> RwError {
107        ErrorCode::NotSupported(
108            format!(
109                "ALTER WATERMARK would change `{}` of the table\nold: {:?}\nnew: {:?}",
110                field, old, new
111            ),
112            "only changes that preserve the plan's state schema are supported; \
113             drop and recreate the table for shape changes"
114                .to_owned(),
115        )
116        .into()
117    }
118
119    if old.stream_key != new.stream_key {
120        return Err(diff("stream_key", &old.stream_key, &new.stream_key));
121    }
122    if old.pk != new.pk {
123        return Err(diff("pk", &old.pk, &new.pk));
124    }
125    let old_wm: Vec<_> = old.watermark_columns.ones().collect();
126    let new_wm: Vec<_> = new.watermark_columns.ones().collect();
127    if old_wm != new_wm {
128        return Err(diff("watermark_columns", &old_wm, &new_wm));
129    }
130    if old.clean_watermark_indices != new.clean_watermark_indices {
131        return Err(diff(
132            "clean_watermark_indices",
133            &old.clean_watermark_indices,
134            &new.clean_watermark_indices,
135        ));
136    }
137    if old.clean_watermark_index_in_pk != new.clean_watermark_index_in_pk {
138        return Err(diff(
139            "clean_watermark_index_in_pk",
140            &old.clean_watermark_index_in_pk,
141            &new.clean_watermark_index_in_pk,
142        ));
143    }
144    // `original_catalog` is loaded via `TableCatalog::from_prost`, which appends the
145    // `_rw_timestamp` system column to `columns`. The freshly-built `new` table from
146    // `get_replace_table_plan` doesn't go through `from_prost` and therefore lacks it.
147    // Compare only user-visible columns to avoid a spurious mismatch.
148    let old_cols = old.columns_without_rw_timestamp();
149    let new_cols = new.columns_without_rw_timestamp();
150    if old_cols.len() != new_cols.len() {
151        return Err(diff("columns.len()", &old_cols.len(), &new_cols.len()));
152    }
153    for (i, (o, n)) in old_cols.iter().zip_eq_fast(new_cols.iter()).enumerate() {
154        if o.data_type() != n.data_type() {
155            return Err(ErrorCode::NotSupported(
156                format!(
157                    "ALTER WATERMARK would change the data type of column \"{}\" (index {}): {:?} -> {:?}",
158                    o.name(), i, o.data_type(), n.data_type()
159                ),
160                "drop and recreate the table".to_owned(),
161            )
162            .into());
163        }
164    }
165    Ok(())
166}