risingwave_frontend/handler/
alter_mv.rs1use std::collections::HashSet;
16use std::sync::Arc;
17
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{ConflictBehavior, FunctionId};
20use risingwave_common::hash::VnodeCount;
21use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query, Statement};
22
23use super::{HandlerArgs, RwPgResponse};
24use crate::TableCatalog;
25use crate::binder::{Binder, BoundQuery};
26use crate::catalog::TableId;
27use crate::catalog::root_catalog::SchemaPath;
28use crate::catalog::table_catalog::TableType;
29use crate::error::{ErrorCode, Result};
30use crate::handler::create_mv;
31use crate::session::SessionImpl;
32
33pub fn fetch_mv_catalog_for_alter(
36 session: &SessionImpl,
37 mv_name: &ObjectName,
38) -> Result<Arc<TableCatalog>> {
39 let db_name = &session.database();
40 let (schema_name, real_mv_name) =
41 Binder::resolve_schema_qualified_name(db_name, mv_name.clone())?;
42 let search_path = session.config().search_path();
43 let user_name = &session.user_name();
44
45 let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);
46
47 let original_catalog = {
48 let reader = session.env().catalog_reader().read_guard();
49 let (table, schema_name) =
50 reader.get_created_table_by_name(db_name, schema_path, &real_mv_name)?;
51
52 match table.table_type() {
53 TableType::MaterializedView => {}
54
55 _ => Err(ErrorCode::InvalidInputSyntax(format!(
56 "\"{mv_name}\" is not a materialized view or cannot be altered"
57 )))?,
58 }
59
60 session.check_privilege_for_drop_alter(schema_name, &**table)?;
61
62 table.clone()
63 };
64
65 Ok(original_catalog)
66}
67
68pub async fn handle_alter_mv(
70 handler_args: HandlerArgs,
71 name: ObjectName,
72 new_query: Box<Query>,
73) -> Result<RwPgResponse> {
74 let session = handler_args.session.clone();
75 let original_catalog = fetch_mv_catalog_for_alter(session.as_ref(), &name)?;
76
77 let original_definition = original_catalog.create_sql_ast()?;
79
80 let (columns, with_options, emit_mode) = match &original_definition {
82 Statement::CreateView {
83 columns,
84 with_options,
85 emit_mode,
86 ..
87 } => (columns.clone(), with_options.clone(), emit_mode.clone()),
88 _ => {
89 return Err(ErrorCode::InternalError(format!(
90 "Expected CREATE MATERIALIZED VIEW statement, got: {:?}",
91 original_definition
92 ))
93 .into());
94 }
95 };
96
97 let new_definition = Statement::CreateView {
99 or_replace: false,
100 materialized: true,
101 if_not_exists: false,
102 name: name.clone(),
103 columns: columns.clone(),
104 query: new_query.clone(),
105 with_options,
106 emit_mode: emit_mode.clone(),
107 };
108 let handler_args = HandlerArgs::new(session.clone(), &new_definition, Arc::from(""))?;
109
110 let (dependent_relations, dependent_udfs, bound_query) = {
111 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
112 let bound_query = binder.bind_query(*new_query)?;
113 (
114 binder.included_relations().clone(),
115 binder.included_udfs().clone(),
116 bound_query,
117 )
118 };
119
120 handle_alter_mv_bound(
121 handler_args,
122 name,
123 bound_query,
124 dependent_relations,
125 dependent_udfs,
126 columns,
127 emit_mode,
128 original_catalog,
129 )
130 .await
131}
132
133async fn handle_alter_mv_bound(
134 handler_args: HandlerArgs,
135 name: ObjectName,
136 query: BoundQuery,
137 dependent_relations: HashSet<TableId>,
138 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
140 emit_mode: Option<EmitMode>,
141 original_catalog: Arc<TableCatalog>,
142) -> Result<RwPgResponse> {
143 let session = handler_args.session.clone();
144
145 let (mut table, graph, _dependencies, _resource_group) = {
148 create_mv::gen_create_mv_graph(
149 handler_args,
150 name,
151 query,
152 dependent_relations,
153 dependent_udfs,
154 columns,
155 emit_mode,
156 )
157 .await?
158 };
159
160 table.conflict_behavior = ConflictBehavior::Overwrite;
163
164 table.id = original_catalog.id;
166 assert!(
167 table.incoming_sinks.is_empty(),
168 "materialized view should not have incoming sinks"
169 );
170 table.vnode_count = VnodeCount::set(original_catalog.vnode_count());
171
172 {
178 let mut new_table = TableCatalog::from(table.to_prost());
181 let mut original_table = original_catalog.as_ref().clone();
182
183 macro_rules! ignore_field {
184 ($($field:ident) ,* $(,)?) => {
185 $(
186 new_table.$field = Default::default();
187 original_table.$field = Default::default();
188 )*
189 };
190 }
191
192 ignore_field!(
194 fragment_id,
195 dml_fragment_id,
196 definition,
197 created_at_epoch,
198 created_at_cluster_version,
199 initialized_at_epoch,
200 initialized_at_cluster_version,
201 create_type,
202 stream_job_status,
203 conflict_behavior,
204 );
205
206 if new_table != original_table {
207 return Err(ErrorCode::NotSupported(
208 "incompatible altering on the target materialized view".to_owned(),
209 format!(
210 "diff between the original and the new materialized view:\n{}",
211 pretty_assertions::Comparison::new(&original_table, &new_table)
212 ),
213 )
214 .into());
215 }
216 }
217
218 let catalog_writer = session.catalog_writer()?;
219 catalog_writer
220 .replace_materialized_view(table.to_prost(), graph)
221 .await?;
222
223 Ok(PgResponse::empty_result(
224 StatementType::ALTER_MATERIALIZED_VIEW,
225 ))
226}