risingwave_frontend/handler/
alter_mv.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId, SecretId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_common::id::ObjectId;
22use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
23
24use super::{HandlerArgs, RwPgResponse};
25use crate::TableCatalog;
26use crate::binder::{Binder, BoundQuery};
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33pub fn fetch_mv_catalog_for_alter(
36 session: &SessionImpl,
37 mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39 let db_name = &session.database();
40 let (schema_name, real_mv_name) = Binder::resolve_schema_qualified_name(db_name, mv_name)?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let original_catalog = {
47 let reader = session.env().catalog_reader().read_guard();
48 let (table, schema_name) =
49 reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
50
51 match table.table_type() {
52 TableType::MaterializedView => {}
53
54 _ => Err(ErrorCode::InvalidInputSyntax(format!(
55 "\"{mv_name}\" is not a materialized view or cannot be altered"
56 )))?,
57 }
58
59 session.check_privilege_for_drop_alter(schema_name, &**table)?;
60
61 table.clone()
62 };
63
64 Ok(original_catalog)
65}
66
67pub async fn handle_alter_mv(
69 handler_args: HandlerArgs,
70 name: ObjectName,
71 new_query: Box<Query>,
72) -> Result<RwPgResponse> {
73 let session = handler_args.session.clone();
74 let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
75
76 let original_definition = original_catalog.create_sql_ast()?;
78
79 let (columns, with_options, emit_mode) = match &original_definition {
81 Statement::CreateView {
82 columns,
83 with_options,
84 emit_mode,
85 ..
86 } => (columns.clone(), with_options.clone(), emit_mode.clone()),
87 _ => {
88 return Err(ErrorCode::InternalError(format!(
89 "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
90 original_definition
91 ))
92 .into());
93 }
94 };
95
96 let new_definition = Statement::CreateView {
98 or_replace: false,
99 materialized: true,
100 if_not_exists: false,
101 name: name.clone(),
102 columns: columns.clone(),
103 query: new_query.clone(),
104 with_options,
105 emit_mode: emit_mode.clone(),
106 };
107 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
108
109 let (dependent_relations, dependent_udfs, dependent_secrets, bound_query) = {
110 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
111 let bound_query = binder.bind_query(&new_query)?;
112 (
113 binder.included_relations().clone(),
114 binder.included_udfs().clone(),
115 binder.included_secrets().clone(),
116 bound_query,
117 )
118 };
119
120 handle_alter_mv_bound(
121 handler_args,
122 name,
123 bound_query,
124 dependent_relations,
125 dependent_udfs,
126 dependent_secrets,
127 columns,
128 emit_mode,
129 original_catalog,
130 )
131 .await
132}
133
134async fn handle_alter_mv_bound(
135 handler_args: HandlerArgs,
136 name: ObjectName,
137 query: BoundQuery,
138 dependent_relations: HashSet<ObjectId>,
139 dependent_udfs: HashSet<FunctionId>, dependent_secrets: HashSet<SecretId>,
141 columns: Vec<Ident>,
142 emit_mode: Option<EmitMode>,
143 original_catalog: Arc<TableCatalog>,
144) -> Result<RwPgResponse> {
145 let session = handler_args.session.clone();
146
147 let (mut table, graph, _dependencies, _resource_group, refresh_interval_sec) = {
150 create_mv::gen_create_mv_graph(
151 handler_args,
152 name,
153 query,
154 dependent_relations,
155 dependent_udfs,
156 dependent_secrets,
157 columns,
158 emit_mode,
159 )
160 .await?
161 };
162
163 if refresh_interval_sec.is_some() {
164 return Err(ErrorCode::InvalidInputSyntax(
165 "ALTER MATERIALIZED VIEW is not supported for batch refresh materialized views"
166 .to_owned(),
167 )
168 .into());
169 }
170
171 table.conflict_behavior = ConflictBehavior::Overwrite;
174
175 table.id = original_catalog.id;
177 table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
178
179 {
185 let mut new_table = TableCatalog::from(table.to_prost());
188 let mut original_table = original_catalog.as_ref().clone();
189
190 macro_rules! ignore_field {
191 ($($field:ident) ,* $(,)?) => {
192 $(
193 new_table.$field = Default::default();
194 original_table.$field = Default::default();
195 )*
196 };
197 }
198
199 ignore_field!(
201 fragment_id,
202 dml_fragment_id,
203 definition,
204 created_at_epoch,
205 created_at_cluster_version,
206 initialized_at_epoch,
207 initialized_at_cluster_version,
208 create_type,
209 stream_job_status,
210 conflict_behavior,
211 );
212
213 if new_table != original_table {
214 return Err(ErrorCode::NotSupported(
215 "incompatible altering on the target materialized view".to_owned(),
216 format!(
217 "diff between the original and the new materialized view:\n{}",
218 pretty_assertions::Comparison::new(&original_table, &new_table)
219 ),
220 )
221 .into());
222 }
223 }
224
225 let catalog_writer = session.catalog_writer()?;
226 catalog_writer
227 .replace_materialized_view(table.to_prost(), graph)
228 .await?;
229
230 Ok(PgResponse::empty_result(
231 StatementType::ALTER_MATERIALIZED_VIEW,
232 ))
233}