risingwave_frontend/handler/
alter_mv.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::TableCatalog;
25use crate::binder::{Binder, BoundQuery};
26use crate::catalog::TableId;
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33pub fn fetch_mv_catalog_for_alter(
36 session: &SessionImpl,
37 mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39 let db_name = &session.database();
40 let (schema_name, real_mv_name) = Binder::resolve_schema_qualified_name(db_name, mv_name)?;
41 let search_path = session.config().search_path();
42 let user_name = &session.user_name();
43
44 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
45
46 let original_catalog = {
47 let reader = session.env().catalog_reader().read_guard();
48 let (table, schema_name) =
49 reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
50
51 match table.table_type() {
52 TableType::MaterializedView => {}
53
54 _ => Err(ErrorCode::InvalidInputSyntax(format!(
55 "\"{mv_name}\" is not a materialized view or cannot be altered"
56 )))?,
57 }
58
59 session.check_privilege_for_drop_alter(schema_name, &**table)?;
60
61 table.clone()
62 };
63
64 Ok(original_catalog)
65}
66
67pub async fn handle_alter_mv(
69 handler_args: HandlerArgs,
70 name: ObjectName,
71 new_query: Box<Query>,
72) -> Result<RwPgResponse> {
73 let session = handler_args.session.clone();
74 let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
75
76 let original_definition = original_catalog.create_sql_ast()?;
78
79 let (columns, with_options, emit_mode) = match &original_definition {
81 Statement::CreateView {
82 columns,
83 with_options,
84 emit_mode,
85 ..
86 } => (columns.clone(), with_options.clone(), emit_mode.clone()),
87 _ => {
88 return Err(ErrorCode::InternalError(format!(
89 "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
90 original_definition
91 ))
92 .into());
93 }
94 };
95
96 let new_definition = Statement::CreateView {
98 or_replace: false,
99 materialized: true,
100 if_not_exists: false,
101 name: name.clone(),
102 columns: columns.clone(),
103 query: new_query.clone(),
104 with_options,
105 emit_mode: emit_mode.clone(),
106 };
107 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
108
109 let (dependent_relations, dependent_udfs, bound_query) = {
110 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
111 let bound_query = binder.bind_query(&new_query)?;
112 (
113 binder.included_relations().clone(),
114 binder.included_udfs().clone(),
115 bound_query,
116 )
117 };
118
119 handle_alter_mv_bound(
120 handler_args,
121 name,
122 bound_query,
123 dependent_relations,
124 dependent_udfs,
125 columns,
126 emit_mode,
127 original_catalog,
128 )
129 .await
130}
131
132async fn handle_alter_mv_bound(
133 handler_args: HandlerArgs,
134 name: ObjectName,
135 query: BoundQuery,
136 dependent_relations: HashSet<TableId>,
137 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
139 emit_mode: Option<EmitMode>,
140 original_catalog: Arc<TableCatalog>,
141) -> Result<RwPgResponse> {
142 let session = handler_args.session.clone();
143
144 let (mut table, graph, _dependencies, _resource_group) = {
147 create_mv::gen_create_mv_graph(
148 handler_args,
149 name,
150 query,
151 dependent_relations,
152 dependent_udfs,
153 columns,
154 emit_mode,
155 )
156 .await?
157 };
158
159 table.conflict_behavior = ConflictBehavior::Overwrite;
162
163 table.id = original_catalog.id;
165 assert!(
166 table.incoming_sinks.is_empty(),
167 "materialized view should not have incoming sinks"
168 );
169 table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
170
171 {
177 let mut new_table = TableCatalog::from(table.to_prost());
180 let mut original_table = original_catalog.as_ref().clone();
181
182 macro_rules! ignore_field {
183 ($($field:ident) ,* $(,)?) => {
184 $(
185 new_table.$field = Default::default();
186 original_table.$field = Default::default();
187 )*
188 };
189 }
190
191 ignore_field!(
193 fragment_id,
194 dml_fragment_id,
195 definition,
196 created_at_epoch,
197 created_at_cluster_version,
198 initialized_at_epoch,
199 initialized_at_cluster_version,
200 create_type,
201 stream_job_status,
202 conflict_behavior,
203 );
204
205 if new_table != original_table {
206 return Err(ErrorCode::NotSupported(
207 "incompatible altering on the target materialized view".to_owned(),
208 format!(
209 "diff between the original and the new materialized view:\n{}",
210 pretty_assertions::Comparison::new(&original_table, &new_table)
211 ),
212 )
213 .into());
214 }
215 }
216
217 let catalog_writer = session.catalog_writer()?;
218 catalog_writer
219 .replace_materialized_view(table.to_prost(), graph)
220 .await?;
221
222 Ok(PgResponse::empty_result(
223 StatementType::ALTER_MATERIALIZED_VIEW,
224 ))
225}