risingwave_frontend/handler/
alter_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::TableCatalog;
25use crate::binder::{Binder, BoundQuery};
26use crate::catalog::TableId;
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33/// Fetch materialized view catalog for alter operations, similar to `fetch_table_catalog_for_alter`
34/// but checks for `TableType::MaterializedView`
35pub fn fetch_mv_catalog_for_alter(
36    session: &SessionImpl,
37    mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39    let db_name = &session.database();
40    let (schema_name, real_mv_name) =
41        Binder::resolve_schema_qualified_name(db_name, mv_name.clone())?;
42    let search_path = session.config().search_path();
43    let user_name = &session.user_name();
44
45    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47    let original_catalog = {
48        let reader = session.env().catalog_reader().read_guard();
49        let (table, schema_name) =
50            reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
51
52        match table.table_type() {
53            TableType::MaterializedView => {}
54
55            _ => Err(ErrorCode::InvalidInputSyntax(format!(
56                "\"{mv_name}\" is not a materialized view or cannot be altered"
57            )))?,
58        }
59
60        session.check_privilege_for_drop_alter(schema_name, &**table)?;
61
62        table.clone()
63    };
64
65    Ok(original_catalog)
66}
67
68// TODO(alter-mv): The current implementation is a WIP and may not work at all yet.
69pub async fn handle_alter_mv(
70    handler_args: HandlerArgs,
71    name: ObjectName,
72    new_query: Box<Query>,
73) -> Result<RwPgResponse> {
74    let session = handler_args.session.clone();
75    let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
76
77    // Retrieve the original MV definition and parse it to AST
78    let original_definition = original_catalog.create_sql_ast()?;
79
80    // Extract unchanged parts from the original definition
81    let (columns, with_options, emit_mode) = match &original_definition {
82        Statement::CreateView {
83            columns,
84            with_options,
85            emit_mode,
86            ..
87        } => (columns.clone(), with_options.clone(), emit_mode.clone()),
88        _ => {
89            return Err(ErrorCode::InternalError(format!(
90                "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
91                original_definition
92            ))
93            .into());
94        }
95    };
96
97    // Create a new CREATE MATERIALIZED VIEW statement with the new query
98    let new_definition = Statement::CreateView {
99        or_replace: false,
100        materialized: true,
101        if_not_exists: false,
102        name: name.clone(),
103        columns: columns.clone(),
104        query: new_query.clone(),
105        with_options,
106        emit_mode: emit_mode.clone(),
107    };
108    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
109
110    let (dependent_relations, dependent_udfs, bound_query) = {
111        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
112        let bound_query = binder.bind_query(*new_query)?;
113        (
114            binder.included_relations().clone(),
115            binder.included_udfs().clone(),
116            bound_query,
117        )
118    };
119
120    handle_alter_mv_bound(
121        handler_args,
122        name,
123        bound_query,
124        dependent_relations,
125        dependent_udfs,
126        columns,
127        emit_mode,
128        original_catalog,
129    )
130    .await
131}
132
133async fn handle_alter_mv_bound(
134    handler_args: HandlerArgs,
135    name: ObjectName,
136    query: BoundQuery,
137    dependent_relations: HashSet<TableId>,
138    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
139    columns: Vec<Ident>,
140    emit_mode: Option<EmitMode>,
141    original_catalog: Arc<TableCatalog>,
142) -> Result<RwPgResponse> {
143    let session = handler_args.session.clone();
144
145    // TODO(alter-mv): use `ColumnIdGenerator` to generate IDs for MV columns, in order to
146    // support schema changes.
147    let (mut table, graph, _dependencies, _resource_group) = {
148        create_mv::gen_create_mv_graph(
149            handler_args,
150            name,
151            query,
152            dependent_relations,
153            dependent_udfs,
154            columns,
155            emit_mode,
156        )
157        .await?
158    };
159
160    // After alter, the data of the MV is not guaranteed to be consistent.
161    // Always set the conflict handler to avoid producing inconsistent changes to downstream.
162    table.conflict_behavior = ConflictBehavior::Overwrite;
163
164    // Set some fields ourselves so that the meta service does not need to maintain them.
165    table.id = original_catalog.id;
166    assert!(
167        table.incoming_sinks.is_empty(),
168        "materialized view should not have incoming sinks"
169    );
170    table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
171
172    // TODO(alter-mv): check changes on dependencies
173
174    // Validate if the new table is compatible with the original one.
175    // Internal tables will be checked in the meta service.
176    // TODO(alter-mv): improve this to make it more robust and friendly.
177    {
178        // Convert back and forth to normalize the `rw_timestamp` column.
179        // TODO: make `rw_timestamp` fully virtual to avoid this workaround.
180        let mut new_table = TableCatalog::from(table.to_prost());
181        let mut original_table = original_catalog.as_ref().clone();
182
183        macro_rules! ignore_field {
184            ($($field:ident) ,* $(,)?) => {
185                $(
186                    new_table.$field = Default::default();
187                    original_table.$field = Default::default();
188                )*
189            };
190        }
191
192        // Reset some fields that allow to be different before comparing.
193        ignore_field!(
194            fragment_id,
195            dml_fragment_id,
196            definition,
197            created_at_epoch,
198            created_at_cluster_version,
199            initialized_at_epoch,
200            initialized_at_cluster_version,
201            create_type,
202            stream_job_status,
203            conflict_behavior,
204        );
205
206        if new_table != original_table {
207            return Err(ErrorCode::NotSupported(
208                "incompatible altering on the target materialized view".to_owned(),
209                format!(
210                    "diff between the original and the new materialized view:\n{}",
211                    pretty_assertions::Comparison::new(&original_table, &new_table)
212                ),
213            )
214            .into());
215        }
216    }
217
218    let catalog_writer = session.catalog_writer()?;
219    catalog_writer
220        .replace_materialized_view(table.to_prost(), graph)
221        .await?;
222
223    Ok(PgResponse::empty_result(
224        StatementType::ALTER_MATERIALIZED_VIEW,
225    ))
226}