risingwave_frontend/handler/
alter_mv.rs

1// Copyright 2025 RisingWave Labs
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::TableCatalog;
25use crate::binder::{Binder, BoundQuery};
26use crate::catalog::TableId;
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33/// Fetch materialized view catalog for alter operations, similar to `fetch_table_catalog_for_alter`
34/// but checks for `TableType::MaterializedView`
35pub fn fetch_mv_catalog_for_alter(
36    session: &SessionImpl,
37    mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39    let db_name = &session.database();
40    let (schema_name, real_mv_name) = Binder::resolve_schema_qualified_name(db_name, mv_name)?;
41    let search_path = session.config().search_path();
42    let user_name = &session.user_name();
43
44    let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46    let original_catalog = {
47        let reader = session.env().catalog_reader().read_guard();
48        let (table, schema_name) =
49            reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
50
51        match table.table_type() {
52            TableType::MaterializedView => {}
53
54            _ => Err(ErrorCode::InvalidInputSyntax(format!(
55                "\"{mv_name}\" is not a materialized view or cannot be altered"
56            )))?,
57        }
58
59        session.check_privilege_for_drop_alter(schema_name, &**table)?;
60
61        table.clone()
62    };
63
64    Ok(original_catalog)
65}
66
67// TODO(alter-mv): The current implementation is a WIP and may not work at all yet.
68pub async fn handle_alter_mv(
69    handler_args: HandlerArgs,
70    name: ObjectName,
71    new_query: Box<Query>,
72) -> Result<RwPgResponse> {
73    let session = handler_args.session.clone();
74    let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
75
76    // Retrieve the original MV definition and parse it to AST
77    let original_definition = original_catalog.create_sql_ast()?;
78
79    // Extract unchanged parts from the original definition
80    let (columns, with_options, emit_mode) = match &original_definition {
81        Statement::CreateView {
82            columns,
83            with_options,
84            emit_mode,
85            ..
86        } => (columns.clone(), with_options.clone(), emit_mode.clone()),
87        _ => {
88            return Err(ErrorCode::InternalError(format!(
89                "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
90                original_definition
91            ))
92            .into());
93        }
94    };
95
96    // Create a new CREATE MATERIALIZED VIEW statement with the new query
97    let new_definition = Statement::CreateView {
98        or_replace: false,
99        materialized: true,
100        if_not_exists: false,
101        name: name.clone(),
102        columns: columns.clone(),
103        query: new_query.clone(),
104        with_options,
105        emit_mode: emit_mode.clone(),
106    };
107    let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
108
109    let (dependent_relations, dependent_udfs, bound_query) = {
110        let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
111        let bound_query = binder.bind_query(&new_query)?;
112        (
113            binder.included_relations().clone(),
114            binder.included_udfs().clone(),
115            bound_query,
116        )
117    };
118
119    handle_alter_mv_bound(
120        handler_args,
121        name,
122        bound_query,
123        dependent_relations,
124        dependent_udfs,
125        columns,
126        emit_mode,
127        original_catalog,
128    )
129    .await
130}
131
132async fn handle_alter_mv_bound(
133    handler_args: HandlerArgs,
134    name: ObjectName,
135    query: BoundQuery,
136    dependent_relations: HashSet<TableId>,
137    dependent_udfs: HashSet<FunctionId>, // TODO(rc): merge with `dependent_relations`
138    columns: Vec<Ident>,
139    emit_mode: Option<EmitMode>,
140    original_catalog: Arc<TableCatalog>,
141) -> Result<RwPgResponse> {
142    let session = handler_args.session.clone();
143
144    // TODO(alter-mv): use `ColumnIdGenerator` to generate IDs for MV columns, in order to
145    // support schema changes.
146    let (mut table, graph, _dependencies, _resource_group) = {
147        create_mv::gen_create_mv_graph(
148            handler_args,
149            name,
150            query,
151            dependent_relations,
152            dependent_udfs,
153            columns,
154            emit_mode,
155        )
156        .await?
157    };
158
159    // After alter, the data of the MV is not guaranteed to be consistent.
160    // Always set the conflict handler to avoid producing inconsistent changes to downstream.
161    table.conflict_behavior = ConflictBehavior::Overwrite;
162
163    // Set some fields ourselves so that the meta service does not need to maintain them.
164    table.id = original_catalog.id;
165    assert!(
166        table.incoming_sinks.is_empty(),
167        "materialized view should not have incoming sinks"
168    );
169    table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
170
171    // TODO(alter-mv): check changes on dependencies
172
173    // Validate if the new table is compatible with the original one.
174    // Internal tables will be checked in the meta service.
175    // TODO(alter-mv): improve this to make it more robust and friendly.
176    {
177        // Convert back and forth to normalize the `rw_timestamp` column.
178        // TODO: make `rw_timestamp` fully virtual to avoid this workaround.
179        let mut new_table = TableCatalog::from(table.to_prost());
180        let mut original_table = original_catalog.as_ref().clone();
181
182        macro_rules! ignore_field {
183            ($($field:ident) ,* $(,)?) => {
184                $(
185                    new_table.$field = Default::default();
186                    original_table.$field = Default::default();
187                )*
188            };
189        }
190
191        // Reset some fields that allow to be different before comparing.
192        ignore_field!(
193            fragment_id,
194            dml_fragment_id,
195            definition,
196            created_at_epoch,
197            created_at_cluster_version,
198            initialized_at_epoch,
199            initialized_at_cluster_version,
200            create_type,
201            stream_job_status,
202            conflict_behavior,
203        );
204
205        if new_table != original_table {
206            return Err(ErrorCode::NotSupported(
207                "incompatible altering on the target materialized view".to_owned(),
208                format!(
209                    "diff between the original and the new materialized view:\n{}",
210                    pretty_assertions::Comparison::new(&original_table, &new_table)
211                ),
212            )
213            .into());
214        }
215    }
216
217    let catalog_writer = session.catalog_writer()?;
218    catalog_writer
219        .replace_materialized_view(table.to_prost(), graph)
220        .await?;
221
222    Ok(PgResponse::empty_result(
223        StatementType::ALTER_MATERIALIZED_VIEW,
224    ))
225}