risingwave_frontend/handler/
create_mv.rs1use std::collections::HashSet;
16
17use either::Either;
18use pgwire::pg_response::{PgResponse, StatementType};
19use risingwave_common::catalog::{FunctionId, ObjectId, TableId};
20use risingwave_pb::catalog::PbTable;
21use risingwave_pb::serverless_backfill_controller::{
22 ProvisionRequest, node_group_controller_service_client,
23};
24use risingwave_sqlparser::ast::{EmitMode, Ident, ObjectName, Query};
25use thiserror_ext::AsReport;
26
27use super::RwPgResponse;
28use crate::WithOptions;
29use crate::binder::{Binder, BoundQuery, BoundSetExpr};
30use crate::catalog::check_column_name_not_reserved;
31use crate::error::ErrorCode::{InvalidInputSyntax, ProtocolError};
32use crate::error::{ErrorCode, Result, RwError};
33use crate::handler::HandlerArgs;
34use crate::optimizer::plan_node::Explain;
35use crate::optimizer::plan_node::generic::GenericPlanRef;
36use crate::optimizer::{OptimizerContext, OptimizerContextRef, PlanRef, RelationCollectorVisitor};
37use crate::planner::Planner;
38use crate::scheduler::streaming_manager::CreatingStreamingJobInfo;
39use crate::session::{SESSION_MANAGER, SessionImpl};
40use crate::stream_fragmenter::build_graph;
41use crate::utils::ordinal;
42
43pub const RESOURCE_GROUP_KEY: &str = "resource_group";
44pub const CLOUD_SERVERLESS_BACKFILL_ENABLED: &str = "cloud.serverless_backfill_enabled";
45
46pub(super) fn parse_column_names(columns: &[Ident]) -> Option<Vec<String>> {
47 if columns.is_empty() {
48 None
49 } else {
50 Some(columns.iter().map(|v| v.real_value()).collect())
51 }
52}
53
54pub(super) fn get_column_names(
59 bound: &BoundQuery,
60 columns: Vec<Ident>,
61) -> Result<Option<Vec<String>>> {
62 let col_names = parse_column_names(&columns);
63 if let BoundSetExpr::Select(select) = &bound.body {
64 if col_names.is_none() {
69 for (i, alias) in select.aliases.iter().enumerate() {
70 if alias.is_none() {
71 return Err(ErrorCode::BindError(format!(
72 "An alias must be specified for the {} expression (counting from 1) in result relation", ordinal(i+1)
73 ))
74 .into());
75 }
76 }
77 }
78 }
79
80 Ok(col_names)
81}
82
83pub fn gen_create_mv_plan(
85 session: &SessionImpl,
86 context: OptimizerContextRef,
87 query: Query,
88 name: ObjectName,
89 columns: Vec<Ident>,
90 emit_mode: Option<EmitMode>,
91) -> Result<(PlanRef, PbTable)> {
92 let mut binder = Binder::new_for_stream(session);
93 let bound = binder.bind_query(query)?;
94 gen_create_mv_plan_bound(session, context, bound, name, columns, emit_mode)
95}
96
97pub fn gen_create_mv_plan_bound(
99 session: &SessionImpl,
100 context: OptimizerContextRef,
101 query: BoundQuery,
102 name: ObjectName,
103 columns: Vec<Ident>,
104 emit_mode: Option<EmitMode>,
105) -> Result<(PlanRef, PbTable)> {
106 if session.config().create_compaction_group_for_mv() {
107 context.warn_to_user("The session variable CREATE_COMPACTION_GROUP_FOR_MV has been deprecated. It will not take effect.");
108 }
109
110 let db_name = &session.database();
111 let (schema_name, table_name) = Binder::resolve_schema_qualified_name(db_name, name)?;
112
113 let (database_id, schema_id) = session.get_database_and_schema_id_for_create(schema_name)?;
114
115 let definition = context.normalized_sql().to_owned();
116
117 let col_names = get_column_names(&query, columns)?;
118
119 let emit_on_window_close = emit_mode == Some(EmitMode::OnWindowClose);
120 if emit_on_window_close {
121 context.warn_to_user("EMIT ON WINDOW CLOSE is currently an experimental feature. Please use it with caution.");
122 }
123
124 let mut plan_root = Planner::new_for_stream(context).plan_query(query)?;
125 plan_root.set_req_dist_as_same_as_req_order();
126 if let Some(col_names) = col_names {
127 for name in &col_names {
128 check_column_name_not_reserved(name)?;
129 }
130 plan_root.set_out_names(col_names)?;
131 }
132 let materialize = plan_root.gen_materialize_plan(
133 database_id,
134 schema_id,
135 table_name,
136 definition,
137 emit_on_window_close,
138 )?;
139 let mut table = materialize.table().to_prost();
140
141 let plan: PlanRef = materialize.into();
142
143 table.owner = session.user_id();
144
145 let ctx = plan.ctx();
146 let explain_trace = ctx.is_explain_trace();
147 if explain_trace {
148 ctx.trace("Create Materialized View:");
149 ctx.trace(plan.explain_to_string());
150 }
151
152 Ok((plan, table))
153}
154
155pub async fn handle_create_mv(
156 handler_args: HandlerArgs,
157 if_not_exists: bool,
158 name: ObjectName,
159 query: Query,
160 columns: Vec<Ident>,
161 emit_mode: Option<EmitMode>,
162) -> Result<RwPgResponse> {
163 let (dependent_relations, dependent_udfs, bound) = {
164 let mut binder = Binder::new_for_stream(handler_args.session.as_ref());
165 let bound = binder.bind_query(query)?;
166 (
167 binder.included_relations().clone(),
168 binder.included_udfs().clone(),
169 bound,
170 )
171 };
172 handle_create_mv_bound(
173 handler_args,
174 if_not_exists,
175 name,
176 bound,
177 dependent_relations,
178 dependent_udfs,
179 columns,
180 emit_mode,
181 )
182 .await
183}
184
185pub async fn provision_resource_group(sbc_addr: String) -> Result<String> {
187 let request = tonic::Request::new(ProvisionRequest {});
188 let mut client =
189 node_group_controller_service_client::NodeGroupControllerServiceClient::connect(
190 sbc_addr.clone(),
191 )
192 .await
193 .map_err(|e| {
194 RwError::from(ErrorCode::InternalError(format!(
195 "unable to reach serverless backfill controller at addr {}: {}",
196 sbc_addr,
197 e.as_report()
198 )))
199 })?;
200
201 match client.provision(request).await {
202 Ok(resp) => Ok(resp.into_inner().resource_group),
203 Err(e) => Err(RwError::from(ErrorCode::InternalError(format!(
204 "serverless backfill controller returned error :{}",
205 e.as_report()
206 )))),
207 }
208}
209
210fn get_with_options(handler_args: HandlerArgs) -> WithOptions {
211 let context = OptimizerContext::from_handler_args(handler_args);
212 context.with_options().clone()
213}
214
215pub async fn handle_create_mv_bound(
216 handler_args: HandlerArgs,
217 if_not_exists: bool,
218 name: ObjectName,
219 query: BoundQuery,
220 dependent_relations: HashSet<TableId>,
221 dependent_udfs: HashSet<FunctionId>, columns: Vec<Ident>,
223 emit_mode: Option<EmitMode>,
224) -> Result<RwPgResponse> {
225 let session = handler_args.session.clone();
226
227 session.check_cluster_limits().await?;
229
230 if let Either::Right(resp) = session.check_relation_name_duplicated(
231 name.clone(),
232 StatementType::CREATE_MATERIALIZED_VIEW,
233 if_not_exists,
234 )? {
235 return Ok(resp);
236 }
237
238 let (table, graph, dependencies, resource_group) = {
239 let mut with_options = get_with_options(handler_args.clone());
240 let mut resource_group = with_options.remove(&RESOURCE_GROUP_KEY.to_owned());
241
242 let is_serverless_backfill = with_options
243 .remove(&CLOUD_SERVERLESS_BACKFILL_ENABLED.to_owned())
244 .unwrap_or_default()
245 .parse::<bool>()
246 .unwrap_or(false);
247
248 if resource_group.is_some() && is_serverless_backfill {
249 return Err(RwError::from(InvalidInputSyntax(
250 "Please do not specify serverless backfilling and resource group together"
251 .to_owned(),
252 )));
253 }
254
255 if !with_options.is_empty() {
256 return Err(RwError::from(ProtocolError(format!(
258 "unexpected options in WITH clause: {:?}",
259 with_options.keys()
260 ))));
261 }
262
263 let sbc_addr = match SESSION_MANAGER.get() {
264 Some(manager) => manager.env().sbc_address(),
265 None => "",
266 }
267 .to_owned();
268
269 if is_serverless_backfill && sbc_addr.is_empty() {
270 return Err(RwError::from(InvalidInputSyntax(
271 "Serverless Backfill is disabled on-premise. Use RisingWave cloud at https://cloud.risingwave.com/auth/signup to try this feature".to_owned(),
272 )));
273 }
274
275 if is_serverless_backfill {
276 match provision_resource_group(sbc_addr).await {
277 Err(e) => {
278 return Err(RwError::from(ProtocolError(format!(
279 "failed to provision serverless backfill nodes: {}",
280 e.as_report()
281 ))));
282 }
283 Ok(val) => resource_group = Some(val),
284 }
285 }
286 tracing::debug!(
287 resource_group = resource_group,
288 "provisioning on resource group"
289 );
290
291 let context = OptimizerContext::from_handler_args(handler_args);
292 let has_order_by = !query.order.is_empty();
293 if has_order_by {
294 context.warn_to_user(r#"The ORDER BY clause in the CREATE MATERIALIZED VIEW statement does not guarantee that the rows selected out of this materialized view is returned in this order.
295It only indicates the physical clustering of the data, which may improve the performance of queries issued against this materialized view.
296"#.to_owned());
297 }
298
299 if resource_group.is_some()
300 && !context
301 .session_ctx()
302 .config()
303 .streaming_use_arrangement_backfill()
304 {
305 return Err(RwError::from(ProtocolError("The session config arrangement backfill must be enabled to use the resource_group option".to_owned())));
306 }
307
308 let (plan, table) =
309 gen_create_mv_plan_bound(&session, context.into(), query, name, columns, emit_mode)?;
310
311 let dependencies =
314 RelationCollectorVisitor::collect_with(dependent_relations, plan.clone())
315 .into_iter()
316 .map(|id| id.table_id() as ObjectId)
317 .chain(
318 dependent_udfs
319 .into_iter()
320 .map(|id| id.function_id() as ObjectId),
321 )
322 .collect();
323
324 let graph = build_graph(plan)?;
325
326 (table, graph, dependencies, resource_group)
327 };
328
329 let _job_guard =
331 session
332 .env()
333 .creating_streaming_job_tracker()
334 .guard(CreatingStreamingJobInfo::new(
335 session.session_id(),
336 table.database_id,
337 table.schema_id,
338 table.name.clone(),
339 ));
340
341 let session = session.clone();
342 let catalog_writer = session.catalog_writer()?;
343 catalog_writer
344 .create_materialized_view(table, graph, dependencies, resource_group)
345 .await?;
346
347 Ok(PgResponse::empty_result(
348 StatementType::CREATE_MATERIALIZED_VIEW,
349 ))
350}
351
352#[cfg(test)]
353pub mod tests {
354 use std::collections::HashMap;
355
356 use pgwire::pg_response::StatementType::CREATE_MATERIALIZED_VIEW;
357 use risingwave_common::catalog::{
358 DEFAULT_DATABASE_NAME, DEFAULT_SCHEMA_NAME, ROW_ID_COLUMN_NAME, RW_TIMESTAMP_COLUMN_NAME,
359 };
360 use risingwave_common::types::{DataType, StructType};
361
362 use crate::catalog::root_catalog::SchemaPath;
363 use crate::test_utils::{LocalFrontend, PROTO_FILE_DATA, create_proto_file};
364
365 #[tokio::test]
366 async fn test_create_mv_handler() {
367 let proto_file = create_proto_file(PROTO_FILE_DATA);
368 let sql = format!(
369 r#"CREATE SOURCE t1
370 WITH (connector = 'kinesis')
371 FORMAT PLAIN ENCODE PROTOBUF (message = '.test.TestRecord', schema.location = 'file://{}')"#,
372 proto_file.path().to_str().unwrap()
373 );
374 let frontend = LocalFrontend::new(Default::default()).await;
375 frontend.run_sql(sql).await.unwrap();
376
377 let sql = "create materialized view mv1 as select t1.country from t1";
378 frontend.run_sql(sql).await.unwrap();
379
380 let session = frontend.session_ref();
381 let catalog_reader = session.env().catalog_reader().read_guard();
382 let schema_path = SchemaPath::Name(DEFAULT_SCHEMA_NAME);
383
384 let (source, _) = catalog_reader
386 .get_source_by_name(DEFAULT_DATABASE_NAME, schema_path, "t1")
387 .unwrap();
388 assert_eq!(source.name, "t1");
389
390 let (table, _) = catalog_reader
392 .get_created_table_by_name(DEFAULT_DATABASE_NAME, schema_path, "mv1")
393 .unwrap();
394 assert_eq!(table.name(), "mv1");
395
396 let columns = table
397 .columns
398 .iter()
399 .map(|col| (col.name(), col.data_type().clone()))
400 .collect::<HashMap<&str, DataType>>();
401
402 let city_type = StructType::new(vec![
403 ("address", DataType::Varchar),
404 ("zipcode", DataType::Varchar),
405 ])
406 .into();
408 let expected_columns = maplit::hashmap! {
409 ROW_ID_COLUMN_NAME => DataType::Serial,
410 "country" => StructType::new(
411 vec![("address", DataType::Varchar),("city", city_type),("zipcode", DataType::Varchar)],
412 )
413 .into(),
415 RW_TIMESTAMP_COLUMN_NAME => DataType::Timestamptz,
416 };
417 assert_eq!(columns, expected_columns, "{columns:#?}");
418 }
419
420 #[tokio::test]
422 async fn test_no_alias() {
423 let frontend = LocalFrontend::new(Default::default()).await;
424
425 let sql = "create table t(x varchar)";
426 frontend.run_sql(sql).await.unwrap();
427
428 let sql = "create materialized view mv0 as select count(x) from t";
430 frontend.run_sql(sql).await.unwrap();
431
432 let sql = "create materialized view mv1 as select count(x), count(*) from t";
434 let err = frontend.run_sql(sql).await.unwrap_err();
435 assert_eq!(
436 err.to_string(),
437 "Invalid input syntax: column \"count\" specified more than once"
438 );
439
440 let sql = "create materialized view mv1 as select 1";
442 let err = frontend.run_sql(sql).await.unwrap_err();
443 assert_eq!(
444 err.to_string(),
445 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
446 );
447
448 let sql = "create materialized view mv1 as select x is null from t";
450 let err = frontend.run_sql(sql).await.unwrap_err();
451 assert_eq!(
452 err.to_string(),
453 "Bind error: An alias must be specified for the 1st expression (counting from 1) in result relation"
454 );
455 }
456
457 #[tokio::test]
459 async fn test_create_mv_with_order_by() {
460 let frontend = LocalFrontend::new(Default::default()).await;
461
462 let sql = "create table t(x varchar)";
463 frontend.run_sql(sql).await.unwrap();
464
465 let sql = "create materialized view mv1 as select * from t";
467 let response = frontend.run_sql(sql).await.unwrap();
468 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
469 assert!(response.notices().is_empty());
470
471 let sql = "create materialized view mv2 as select * from t order by x";
473 let response = frontend.run_sql(sql).await.unwrap();
474 assert_eq!(response.stmt_type(), CREATE_MATERIALIZED_VIEW);
475 }
476}