risingwave_frontend/handler/
alter_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId, SecretId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::id::ObjectId;
22use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
23
24use super::{HandlerArgs, RwPgResponse};
25use crate::TableCatalog;
26use crate::binder::{Binder, BoundQuery};
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33/// Fetch materialized view catalog for alter operations, similar to `fetch_table_catalog_for_alter`
34/// but checks for `TableType::MaterializedView`
35pub fn fetch_mv_catalog_for_alter(
36    session: &SessionImpl,
37    mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39    let db_name = &session.database();
40    let (schema_name, real_mv_name) = Binder::resolve_schema_qualified_name(db_name, mv_name)?;
41    let search_path = session.config().search_path();
42    let user_name = &session.user_name();
43
44    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46    let original_catalog = {
47        let reader = session.env().catalog_reader().read_guard();
48        let (table, schema_name) =
49            reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
50
51        match table.table_type() {
52            TableType::MaterializedView => {}
53
54            _ => Err(ErrorCode::InvalidInputSyntax(format!(
55                "\"{mv_name}\" is not a materialized view or cannot be altered"
56            )))?,
57        }
58
59        session.check_privilege_for_drop_alter(schema_name, &**table)?;
60
61        table.clone()
62    };
63
64    Ok(original_catalog)
65}
66
67// TODO(alter-mv): The current implementation is a WIP and may not work at all yet.
68pub async fn handle_alter_mv(
69    handler_args: HandlerArgs,
70    name: ObjectName,
71    new_query: Box<Query>,
72) -> Result<RwPgResponse> {
73    let session = handler_args.session.clone();
74    let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
75
76    // Retrieve the original MV definition and parse it to AST
77    let original_definition = original_catalog.create_sql_ast()?;
78
79    // Extract unchanged parts from the original definition
80    let (columns, with_options, emit_mode) = match &original_definition {
81        Statement::CreateView {
82            columns,
83            with_options,
84            emit_mode,
85            ..
86        } => (columns.clone(), with_options.clone(), emit_mode.clone()),
87        _ => {
88            return Err(ErrorCode::InternalError(format!(
89                "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
90                original_definition
91            ))
92            .into());
93        }
94    };
95
96    // Create a new CREATE MATERIALIZED VIEW statement with the new query
97    let new_definition = Statement::CreateView {
98        or_replace: false,
99        materialized: true,
100        if_not_exists: false,
101        name: name.clone(),
102        columns: columns.clone(),
103        query: new_query.clone(),
104        with_options,
105        emit_mode: emit_mode.clone(),
106    };
107    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
108
109    let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
110        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
111        let bound_query = binder.bind_query(&new_query)?;
112        (
113            binder.included_relations().clone(),
114            binder.included_udfs().clone(),
115            binder.included_secrets().clone(),
116            bound_query,
117        )
118    };
119
120    handle_alter_mv_bound(
121        handler_args,
122        name,
123        bound_query,
124        dependent_relations,
125        dependent_udfs,
126        dependent_secrets,
127        columns,
128        emit_mode,
129        original_catalog,
130    )
131    .await
132}
133
134async fn handle_alter_mv_bound(
135    handler_args: HandlerArgs,
136    name: ObjectName,
137    query: BoundQuery,
138    dependent_relations: HashSet<ObjectId>,
139    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
140    dependent_secrets: HashSet<SecretId>,
141    columns: Vec<Ident>,
142    emit_mode: Option<EmitMode>,
143    original_catalog: Arc<TableCatalog>,
144) -> Result<RwPgResponse> {
145    let session = handler_args.session.clone();
146
147    // TODO(alter-mv): use `ColumnIdGenerator` to generate IDs for MV columns, in order to
148    // support schema changes.
149    let (mut table, graph, _dependencies, _resource_group) = {
150        create_mv::gen_create_mv_graph(
151            handler_args,
152            name,
153            query,
154            dependent_relations,
155            dependent_udfs,
156            dependent_secrets,
157            columns,
158            emit_mode,
159        )
160        .await?
161    };
162
163    // After alter, the data of the MV is not guaranteed to be consistent.
164    // Always set the conflict handler to avoid producing inconsistent changes to downstream.
165    table.conflict_behavior = ConflictBehavior::Overwrite;
166
167    // Set some fields ourselves so that the meta service does not need to maintain them.
168    table.id = original_catalog.id;
169    table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
170
171    // TODO(alter-mv): check changes on dependencies
172
173    // Validate if the new table is compatible with the original one.
174    // Internal tables will be checked in the meta service.
175    // TODO(alter-mv): improve this to make it more robust and friendly.
176    {
177        // Convert back and forth to normalize the `rw_timestamp` column.
178        // TODO: make `rw_timestamp` fully virtual to avoid this workaround.
179        let mut new_table = TableCatalog::from(table.to_prost());
180        let mut original_table = original_catalog.as_ref().clone();
181
182        macro_rules! ignore_field {
183            ($($field:ident) ,* $(,)?) => {
184                $(
185                    new_table.$field = Default::default();
186                    original_table.$field = Default::default();
187                )*
188            };
189        }
190
191        // Reset some fields that allow to be different before comparing.
192        ignore_field!(
193            fragment_id,
194            dml_fragment_id,
195            definition,
196            created_at_epoch,
197            created_at_cluster_version,
198            initialized_at_epoch,
199            initialized_at_cluster_version,
200            create_type,
201            stream_job_status,
202            conflict_behavior,
203        );
204
205        if new_table != original_table {
206            return Err(ErrorCode::NotSupported(
207                "incompatible altering on the target materialized view".to_owned(),
208                format!(
209                    "diff between the original and the new materialized view:\n{}",
210                    pretty_assertions::Comparison::new(&original_table, &new_table)
211                ),
212            )
213            .into());
214        }
215    }
216
217    let catalog_writer = session.catalog_writer()?;
218    catalog_writer
219        .replace_materialized_view(table.to_prost(), graph)
220        .await?;
221
222    Ok(PgResponse::empty_result(
223        StatementType::ALTER_MATERIALIZED_VIEW,
224    ))
225}