risingwave_frontend/handler/
alter_mv.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId, SecretId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::id::ObjectId;
22use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
23
24use super::{HandlerArgs, RwPgResponse};
25use crate::TableCatalog;
26use crate::binder::{Binder, BoundQuery};
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33pub fn fetch_mv_catalog_for_alter(
36 session: &SessionImpl,
37 mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39 let db_name = &session.database();
40 let (schema_name, real_mv_name) = Binder::resolve_schema_qualified_name(db_name, mv_name)?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let original_catalog = {
47 let reader = session.env().catalog_reader().read_guard();
48 let (table, schema_name) =
49 reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
50
51 match table.table_type() {
52 TableType::MaterializedView => {}
53
54 _ => Err(ErrorCode::InvalidInputSyntax(format!(
55 "\"{mv_name}\" is not a materialized view or cannot be altered"
56 )))?,
57 }
58
59 session.check_privilege_for_drop_alter(schema_name, &**table)?;
60
61 table.clone()
62 };
63
64 Ok(original_catalog)
65}
66
67pub async fn handle_alter_mv(
69 handler_args: HandlerArgs,
70 name: ObjectName,
71 new_query: Box<Query>,
72) -> Result<RwPgResponse> {
73 let session = handler_args.session.clone();
74 let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
75
76 let original_definition = original_catalog.create_sql_ast()?;
78
79 let (columns, with_options, emit_mode) = match &original_definition {
81 Statement::CreateView {
82 columns,
83 with_options,
84 emit_mode,
85 ..
86 } => (columns.clone(), with_options.clone(), emit_mode.clone()),
87 _ => {
88 return Err(ErrorCode::InternalError(format!(
89 "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
90 original_definition
91 ))
92 .into());
93 }
94 };
95
96 let new_definition = Statement::CreateView {
98 or_replace: false,
99 materialized: true,
100 if_not_exists: false,
101 name: name.clone(),
102 columns: columns.clone(),
103 query: new_query.clone(),
104 with_options,
105 emit_mode: emit_mode.clone(),
106 };
107 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
108
109 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
110 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
111 let bound_query = binder.bind_query(&new_query)?;
112 (
113 binder.included_relations().clone(),
114 binder.included_udfs().clone(),
115 binder.included_secrets().clone(),
116 bound_query,
117 )
118 };
119
120 handle_alter_mv_bound(
121 handler_args,
122 name,
123 bound_query,
124 dependent_relations,
125 dependent_udfs,
126 dependent_secrets,
127 columns,
128 emit_mode,
129 original_catalog,
130 )
131 .await
132}
133
134async fn handle_alter_mv_bound(
135 handler_args: HandlerArgs,
136 name: ObjectName,
137 query: BoundQuery,
138 dependent_relations: HashSet<ObjectId>,
139 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
141 columns: Vec<Ident>,
142 emit_mode: Option<EmitMode>,
143 original_catalog: Arc<TableCatalog>,
144) -> Result<RwPgResponse> {
145 let session = handler_args.session.clone();
146
147 let (mut table, graph, _dependencies, _resource_group) = {
150 create_mv::gen_create_mv_graph(
151 handler_args,
152 name,
153 query,
154 dependent_relations,
155 dependent_udfs,
156 dependent_secrets,
157 columns,
158 emit_mode,
159 )
160 .await?
161 };
162
163 table.conflict_behavior = ConflictBehavior::Overwrite;
166
167 table.id = original_catalog.id;
169 table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
170
171 {
177 let mut new_table = TableCatalog::from(table.to_prost());
180 let mut original_table = original_catalog.as_ref().clone();
181
182 macro_rules! ignore_field {
183 ($($field:ident) ,* $(,)?) => {
184 $(
185 new_table.$field = Default::default();
186 original_table.$field = Default::default();
187 )*
188 };
189 }
190
191 ignore_field!(
193 fragment_id,
194 dml_fragment_id,
195 definition,
196 created_at_epoch,
197 created_at_cluster_version,
198 initialized_at_epoch,
199 initialized_at_cluster_version,
200 create_type,
201 stream_job_status,
202 conflict_behavior,
203 );
204
205 if new_table != original_table {
206 return Err(ErrorCode::NotSupported(
207 "incompatible altering on the target materialized view".to_owned(),
208 format!(
209 "diff between the original and the new materialized view:\n{}",
210 pretty_assertions::Comparison::new(&original_table, &new_table)
211 ),
212 )
213 .into());
214 }
215 }
216
217 let catalog_writer = session.catalog_writer()?;
218 catalog_writer
219 .replace_materialized_view(table.to_prost(), graph)
220 .await?;
221
222 Ok(PgResponse::empty_result(
223 StatementType::ALTER_MATERIALIZED_VIEW,
224 ))
225}